Skip to content

Commit c18f9bb

Browse files
committed
adds annotations to xqueue
1 parent 5c1d7d9 commit c18f9bb

3 files changed

Lines changed: 94 additions & 22 deletions

File tree

.luacheckrc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
std = "luajit"
2+
codes = true
23
globals = {
34
-- Tarantool variable:
45
"box",
@@ -10,8 +11,11 @@ globals = {
1011
}
1112

1213
ignore = {
13-
-- "211",
14+
"211",
15+
"212",
16+
"431",
17+
"432",
18+
"542",
1419
"611",
1520
-- "631",
16-
-- "542"
1721
}

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"Lua.runtime.version": "LuaJIT"
3+
}

xqueue.lua

Lines changed: 85 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Status:
7474
requires `runat`
7575
`delay` may be set during put, release, kick
7676
turned into R after delay
77-
77+
7878
B - buried - task was temporary discarded from queue by consumer
7979
may be revived using kick by administrator
8080
use it in unpredicted conditions, when man intervention is required
@@ -84,7 +84,7 @@ Status:
8484
8585
D - done - task was processed and ack'ed and permanently left in database
8686
enabled when keep feature is set
87-
87+
8888
X - reserved for statistics
8989
9090
(TODO: reload/upgrade and feature switch)
@@ -219,8 +219,70 @@ local function _table2tuple ( qformat )
219219
return dostring(fun)
220220
end
221221

222+
---@type xqueueSpace
222223
local methods = {}
223224

225+
---@class PrimaryKeyField:table
226+
---@field no number position in tuple
227+
---@field name string name of the field
228+
---@field type "uuid"|"string"|"number"|"unsigned"|"integer"|"boolean" type of the field
229+
230+
---@class xqFeatures: table
231+
---@field id "auto_increment"|"time64"|"uuid"| fun(): scalar (Default: uuid)
232+
---@field retval "tuple" | "table"
233+
---@field buried boolean
234+
---@field delayed boolean
235+
---@field keep boolean
236+
---@field tube boolean
237+
---@field zombie boolean|number
238+
---@field zombie_delay number
239+
---@field ttl boolean|number
240+
---@field ttr boolean|number
241+
---@field ttl_default number?
242+
---@field ttr_default number?
243+
244+
---@class xq:table
245+
---@field NEVER integer (Default: 0)
246+
---@field atomic fun(self: xq, key: scalar, fun: fun(...:any): ...?): ...
247+
---@field bysid table<number,table<string,string>> mapping sid => {key => key}
248+
---@field taken table<string,number> mapping key => sid
249+
---@field _lock table<string,boolean> locks key => boolean (in atomic)
250+
---@field put_wait table<string,{cond:fiber.cond,task:box.tuple?,processed: boolean?}> mapping key => fiber.cond for producer
251+
---@field take_wait fiber.channel
252+
---@field take_chans table<string,fiber.channel> mapping tube => fiber.channel
253+
---@field debug boolean
254+
---@field have_runat boolean
255+
---@field gen_id? fun(): scalar
256+
---@field getkey fun(self: xq, arg: table|scalar|box.tuple): scalar
257+
---@field packkey fun(self: xq, key: any): string
258+
---@field tube_index? boxIndex
259+
---@field index boxIndex
260+
---@field key PrimaryKeyField
261+
---@field fieldmap table<string,number>
262+
---@field timeoffset fun(delta: number): number
263+
---@field features xqFeatures
264+
---@field fields table<string, string|number>
265+
---@field tuple fun(tbl: table): box.tuple
266+
---@field table fun(tuple: box.tuple): table
267+
---@field retwrap fun(t: box.tuple|table): table|box.tuple
268+
---@field wakeup fun(self: xq, t: box.tuple|table)
269+
---@field runat_chan fiber.channel
270+
---@field check_owner fun(self: xq, key: tuple_type|box.tuple): box.tuple
271+
---@field put_back fun(key: table|box.tuple)
272+
---@field _stat { counts: table, transition: table }
273+
---@field putback fun(self: xq, task: table|box.tuple)
274+
---@field _default_truncate fun(space: boxSpaceObject)
275+
---@field _on_repl replaceTrigger
276+
---@field _on_dis fun()
277+
278+
279+
---@class xqueueSpace: boxSpaceObject
280+
---@field xq xq xqueue specific storage
281+
282+
---Upgrades given space to xqueue instance
283+
---@param space xqueueSpace
284+
---@param opts table
285+
---@param depth? number
224286
function M.upgrade(space,opts,depth)
225287
depth = depth or 0
226288
log.info("xqueue upgrade(%s,%s)", space.name, json.encode(opts))
@@ -269,11 +331,11 @@ function M.upgrade(space,opts,depth)
269331
end
270332
})
271333
self.debug = not not opts.debug
272-
334+
273335
if not self._default_truncate then
274336
self._default_truncate = space.truncate
275337
end
276-
338+
277339
local format_av = box.space._space.index.name:get(space.name)[ 7 ]
278340
local format = {}
279341
local have_format = false
@@ -365,7 +427,7 @@ function M.upgrade(space,opts,depth)
365427
self.key = pkf
366428
self.fields = fields
367429
self.fieldmap = fieldmap
368-
430+
369431
if not self._stat then
370432
self._stat = {
371433
counts = {};
@@ -376,7 +438,7 @@ function M.upgrade(space,opts,depth)
376438
self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1
377439
end
378440
end
379-
441+
380442
function self:getkey(arg)
381443
local _type = type(arg)
382444
if _type == 'table' then
@@ -459,6 +521,7 @@ function M.upgrade(space,opts,depth)
459521
end
460522
end
461523

524+
---@type boxIndex
462525
local runat_index
463526
if fields.runat then
464527
for _,index in pairs(space.index) do
@@ -768,7 +831,7 @@ function M.upgrade(space,opts,depth)
768831
end
769832
if #collect >= maxrun then remaining = 0 break end
770833
end
771-
834+
772835
for _,t in ipairs(collect) do
773836
-- log.info("Runat: %s, %s", _, t)
774837
if t[xq.fields.status] == 'W' then
@@ -817,7 +880,7 @@ function M.upgrade(space,opts,depth)
817880
end
818881
return 1
819882
end)
820-
883+
821884
table_clear(collect)
822885

823886
if r then
@@ -900,10 +963,10 @@ function M.upgrade(space,opts,depth)
900963
end
901964
self.ready = nil
902965
end
903-
966+
904967
local meta = debug.getmetatable(space)
905968
for k,v in pairs(methods) do meta[k] = v end
906-
969+
907970
-- Triggers must set right before updating space
908971
-- because raising error earlier leads to trigger inconsistency
909972
self._on_repl = space:on_replace(function(old, new)
@@ -922,7 +985,7 @@ function M.upgrade(space,opts,depth)
922985
else
923986
old_st = 'X'
924987
end
925-
988+
926989
if new then
927990
new_st = new[self.fields.status]
928991
counts[new_st] = (counts[new_st] or 0LL) + 1
@@ -932,15 +995,15 @@ function M.upgrade(space,opts,depth)
932995
else
933996
new_st = 'X'
934997
end
935-
998+
936999
local field = old_st.."-"..new_st
9371000
self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1
9381001
end, self._on_repl)
939-
1002+
9401003
self._on_dis = box.session.on_disconnect(function()
9411004
local sid = box.session.id()
9421005
local peer = box.session.storage.peer
943-
1006+
9441007
log.info("%s: disconnected %s, sid=%s, fid=%s", space.name, peer, sid, fiber.id() )
9451008
box.session.storage.destroyed = true
9461009
if self.bysid[sid] then
@@ -968,7 +1031,7 @@ function M.upgrade(space,opts,depth)
9681031
self.bysid[sid] = nil
9691032
end
9701033
end, self._on_dis)
971-
1034+
9721035
rawset(space,'xq',self)
9731036

9741037
log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status)
@@ -1263,7 +1326,7 @@ function methods:take(timeout, opts)
12631326
local r,e = pcall(function()
12641327
local sid = box.session.id()
12651328
local peer = box.session.storage.peer
1266-
1329+
12671330
-- print("Take ",key," for ",peer," sid=",sid, "; fid=",fiber.id() )
12681331
if xq.debug then
12691332
log.info("Take {%s} by %s, sid=%s, fid=%s", key, peer, sid, fiber.id())
@@ -1350,6 +1413,7 @@ function methods:release(key, attr)
13501413
xq:atomic(key,function()
13511414
t = self:update({key}, update)
13521415

1416+
---@cast t box.tuple
13531417
xq:wakeup(t)
13541418
if xq.have_runat then
13551419
xq.runat_chan:put(true,0)
@@ -1358,9 +1422,9 @@ function methods:release(key, attr)
13581422
log.info("Rel: %s->%s {%s} +%s from %s/sid=%s/fid=%s", old, t[xq.fields.status],
13591423
key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
13601424
end)
1361-
1425+
13621426
xq:putback(t)
1363-
1427+
13641428
return t
13651429
end
13661430

@@ -1402,6 +1466,7 @@ function methods:ack(key, attr)
14021466
xq:atomic(key,function()
14031467
if #update > 0 then
14041468
t = self:update({key}, update)
1469+
---@cast t box.tuple
14051470
xq:wakeup(t)
14061471
if xq.have_runat then
14071472
xq.runat_chan:put(true,0)
@@ -1450,7 +1515,7 @@ end
14501515

14511516
local function kick_task(self, key, attr)
14521517
local xq = self.xq
1453-
local key = xq:getkey(key)
1518+
key = xq:getkey(key)
14541519
local peer = box.session.storage.peer
14551520
local sid = box.session.id()
14561521
attr = attr or {}
@@ -1480,7 +1545,7 @@ end
14801545

14811546
function methods:kill(key)
14821547
local xq = self.xq
1483-
local key = xq:getkey(key)
1548+
key = xq:getkey(key)
14841549
local task = self:get(key)
14851550
local status = task[xq.fields.status]
14861551
local peer = box.session.storage.peer

0 commit comments

Comments
 (0)