openresty互斥锁

代码实现及分析

可用于设定单台openresty实例上的互斥锁,下面简单分析下代码实现

互斥锁是利用shared.dict进程数据共享及所有操作都是原子操作的特点实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
...
local FREE_LIST_REF = 0
-- memo存储引用id与对象之间的映射关系
-- 对象可以是字符串、table各种类型
local memo = {}
-- 获取对象的引用id
local function ref_obj(key)
if key == nil then
return -1
end
local ref = memo[FREE_LIST_REF]
if ref and ref ~= 0 then
memo[FREE_LIST_REF] = memo[ref]
else
ref = #memo + 1
end
memo[ref] = key
-- print("ref key_id returned ", ref)
return ref
end
-- 根据引用id接触memo中的对象映射关系
local function unref_obj(ref)
if ref >= 0 then
memo[ref] = memo[FREE_LIST_REF]
memo[FREE_LIST_REF] = ref
end
end
-- gc回调函数,删除cdata.dict_id/key_id在memo中的映射关系
local function gc_lock(cdata)
local dict_id = tonumber(cdata.dict_id)
local key_id = tonumber(cdata.key_id)
-- print("key_id: ", key_id, ", key: ", memo[key_id], "dict: ",
-- type(memo[cdata.dict_id]))
if key_id > 0 then
local key = memo[key_id]
unref_obj(key_id)
local dict = memo[dict_id]
-- print("dict.delete type: ", type(dict.delete))
local ok, err = dict:delete(key)
if not ok then
log(ERR, 'failed to delete key "', key, '": ', err)
end
cdata.key_id = 0
end
unref_obj(dict_id)
end
-- 生成新类型的创建函数
local ctype = ffi.metatype("struct { int key_id; int dict_id; }",
{ __gc = gc_lock })
-- 初始化,生成库操作对象
function _M.new(_, dict_name, opts)
local dict = shared[dict_name]
if not dict then
return nil, "dictionary not found"
end
-- 开辟空间,生成ctype类型的变量
local cdata = ffi_new(ctype)
cdata.key_id = 0
cdata.dict_id = ref_obj(dict) --获取dict引用id
local timeout, exptime, step, ratio, max_step
if opts then
timeout = opts.timeout
exptime = opts.exptime
step = opts.step
ratio = opts.ratio
max_step = opts.max_step
end
if not exptime then
exptime = 30
end
if timeout then
timeout = min(timeout, exptime)
if step then
step = min(step, timeout)
end
end
local self = {
cdata = cdata,
dict = dict,
timeout = timeout or 5, -- 获取锁等待超时时间
exptime = exptime, -- 锁自动过期时间
step = step or 0.001, -- 初始step时间 等待锁时每隔step时间会尝试再去尝试获取锁
ratio = ratio or 2, -- 比率,step以ratio利率增长 step = step*ratio
max_step = max_step or 0.5, -- 最大step时间 达到最大step后不再增长
}
return setmetatable(self, mt)
end
-- 获取锁
function _M.lock(self, key)
if not key then
return nil, "nil key"
end
local dict = self.dict
local cdata = self.cdata
-- cdata.key_id大于0表示锁已被占用
if cdata.key_id > 0 then
return nil, "locked"
end
local exptime = self.exptime
-- dict:add只作用于不存在的key,如果key已存在会返回false,"exists"
local ok, err = dict:add(key, true, exptime)
if ok then
cdata.key_id = ref_obj(key)
self.key = key
return 0
end
if err ~= "exists" then
return nil, err
end
-- lock held by others
local step = self.step
local ratio = self.ratio
local timeout = self.timeout
local max_step = self.max_step
local elapsed = 0
-- 每隔step时间,再去获取锁
while timeout > 0 do
sleep(step)
elapsed = elapsed + step
timeout = timeout - step
local ok, err = dict:add(key, true, exptime)
if ok then
-- cdata.key_id变为大于0
cdata.key_id = ref_obj(key)
self.key = key
return elapsed
end
if err ~= "exists" then
return nil, err
end
if timeout <= 0 then
break
end
step = min(max(0.001, step * ratio), timeout, max_step)
end
return nil, "timeout"
end
-- 解锁
function _M.unlock(self)
local dict = self.dict
local cdata = self.cdata
local key_id = tonumber(cdata.key_id)
-- key_id小于等于0表示已解锁
if key_id <= 0 then
return nil, "unlocked"
end
local key = memo[key_id]
-- 清理映射关系中存储的key_id信息
unref_obj(key_id)
-- 清理共享字典中的key信息
local ok, err = dict:delete(key)
if not ok then
return nil, err
end
cdata.key_id = 0
return 1
end
-- 重新设置过期时间
function _M.expire(self, time)
local dict = self.dict
local cdata = self.cdata
local key_id = tonumber(cdata.key_id)
if key_id <= 0 then
return nil, "unlocked"
end
if not time then
time = self.exptime
end
-- 利用replace重新设置过期时间
local ok, err = dict:replace(self.key, true, time)
if not ok then
return nil, err
end
return true
end
return _M

缓存锁使用示例

The basic workflow for a cache lock is as follows:

  1. Check the cache for a hit with the key. If a cache miss happens, proceed to step 2.
  2. Instantiate a resty.lock object, call the lock method on the key, and check the 1st return value, i.e., the lock waiting time. If it is nil, handle the error; otherwise proceed to step 3.
  3. Check the cache again for a hit. If it is still a miss, proceed to step 4; otherwise release the lock by calling unlock and then return the cached value.
  4. Query the backend (the data source) for the value, put the result into the cache, and then release the lock currently held by calling unlock.

Below is a kinda complete code example that demonstrates the idea.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
local resty_lock = require "resty.lock"
local cache = ngx.shared.my_cache
-- step 1:
local val, err = cache:get(key)
if val then
ngx.say("result: ", val)
return
end
if err then
return fail("failed to get key from shm: ", err)
end
-- cache miss!
-- step 2:
local lock, err = resty_lock:new("my_locks")
if not lock then
return fail("failed to create lock: ", err)
end
local elapsed, err = lock:lock(key)
if not elapsed then
return fail("failed to acquire the lock: ", err)
end
-- lock successfully acquired!
-- step 3:
-- 在某个请求拿到互斥锁完成缓存写入后,其他请求可以依次拿到锁,所以这里要再次检查缓存内容
-- someone might have already put the value into the cache
-- so we check it here again:
val, err = cache:get(key)
if val then
local ok, err = lock:unlock()
if not ok then
return fail("failed to unlock: ", err)
end
ngx.say("result: ", val)
return
end
--- step 4:
local val = fetch_redis(key)
if not val then
local ok, err = lock:unlock()
if not ok then
return fail("failed to unlock: ", err)
end
-- FIXME: we should handle the backend miss more carefully
-- here, like inserting a stub value into the cache.
ngx.say("no value found")
return
end
-- update the shm cache with the newly fetched value
local ok, err = cache:set(key, val, 1)
if not ok then
local ok, err = lock:unlock()
if not ok then
return fail("failed to unlock: ", err)
end
return fail("failed to update shm cache: ", err)
end
local ok, err = lock:unlock()
if not ok then
return fail("failed to unlock: ", err)
end
ngx.say("result: ", val)

资料

lua-resty-lock标准库