Skip to content

Commit 242f653

Browse files
committed
Adds put+wait notifications from TTL
1 parent 190ba3f commit 242f653

1 file changed

Lines changed: 65 additions & 12 deletions

File tree

xqueue.lua

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,16 @@ function M.upgrade(space,opts,depth)
245245
self.bysid = space.xq.bysid
246246
self._lock = space.xq._lock
247247
self.take_wait = space.xq.take_wait
248-
self.take_chans = space.xq.take_chans or {}
248+
self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
249+
self.put_wait = space.xq.put_wait or setmetatable({}, { __mode = 'v' })
249250
self._on_repl = space.xq._on_repl
250251
self._on_dis = space.xq._on_dis
251252
else
252253
self.taken = {}
253254
self.bysid = {}
254255
-- byfid = {};
255256
self._lock = {}
257+
self.put_wait = setmetatable({}, { __mode = 'v' })
256258
self.take_wait = fiber.channel(0)
257259
self.take_chans = setmetatable({}, { __mode = 'v' })
258260
end
@@ -393,6 +395,14 @@ function M.upgrade(space,opts,depth)
393395
end
394396
end
395397

398+
function self:packkey(key)
399+
if type(key) == 'cdata' then
400+
return tostring(ffi.cast("uint64_t", key))
401+
else
402+
return key
403+
end
404+
end
405+
396406
do
397407
local filter
398408
if fields.priority then
@@ -639,6 +649,43 @@ function M.upgrade(space,opts,depth)
639649
return t[pkf.no]
640650
end
641651

652+
-- Notify producer if it is still waiting us.
653+
-- Producer waits only for successfully processed task
654+
-- or for task which would never be processed.
655+
-- Discarding to process task depends on consumer's logic.
656+
-- Also task can be deleted from space by exceeding TTL.
657+
-- Producer should not be notified if queue is able to process task in future.
658+
local function notify_producer(key, task)
659+
local pkey = self:packkey(key)
660+
local chan = self.put_wait[pkey]
661+
if not chan or not chan:has_readers() then
662+
-- no producer
663+
return
664+
end
665+
if task.status == 'Z' or task.status == 'D' then
666+
-- task was processed
667+
chan:put({ task, true }, 0)
668+
return
669+
end
670+
if not space:get{key} then
671+
-- task is not present in space
672+
-- it could be TTL or :ack
673+
if task.status == 'T' then
674+
-- task was acked:
675+
chan:put({ task, true }, 0)
676+
elseif task.status == 'R' then
677+
-- task was killed by TTL
678+
chan:put({ task, false }, 0)
679+
end
680+
else
681+
-- task is still in space
682+
if task.status == 'B' then
683+
-- task was buried
684+
chan:put({ task, false }, 0)
685+
end
686+
end
687+
end
688+
642689
if opts.worker then
643690
local workers = opts.workers or 1
644691
local worker = opts.worker
@@ -714,8 +761,10 @@ function M.upgrade(space,opts,depth)
714761
})
715762
xq:wakeup(u)
716763
elseif t[xq.fields.status] == 'R' and xq.features.ttl then
717-
log.info("Runat: Kill R by ttl %s (%+0.2fs)",xq:keyfield(t), fiber.time() - t[ xq.fields.runat ])
718-
space:delete{ xq:keyfield(t) }
764+
local key = xq:keyfield(t)
765+
log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
766+
t = space:delete{key}
767+
notify_producer(key, t)
719768
elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
720769
log.info("Runat: Kill Zombie %s",xq:keyfield(t))
721770
space:delete{ xq:keyfield(t) }
@@ -792,7 +841,8 @@ function M.upgrade(space,opts,depth)
792841
return t
793842
end
794843

795-
function self:putback(key)
844+
function self:putback(task)
845+
local key = task[ self.key.no ]
796846
local sid = self.taken[ key ]
797847
if sid then
798848
self.taken[ key ] = nil
@@ -804,9 +854,9 @@ function M.upgrade(space,opts,depth)
804854
else
805855
log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
806856
end
807-
-- TODO: putwait
808-
end
809857

858+
notify_producer(key, task)
859+
end
810860

811861
function self:wakeup(t)
812862
if t[self.fieldmap.status] ~= 'R' then return end
@@ -1042,7 +1092,7 @@ function methods:put(t, opts)
10421092
local chan
10431093
if opts.wait then
10441094
chan = fiber.channel(1)
1045-
xq.put_wait[key] = chan
1095+
xq.put_wait[xq:packkey(key)] = chan
10461096
end
10471097

10481098
xq:atomic(key, function()
@@ -1051,9 +1101,12 @@ function methods:put(t, opts)
10511101
xq:wakeup(t)
10521102

10531103
if chan then
1054-
local res = chan:get(opts.wait)
1055-
if res then
1056-
return xq.retwrap(res), true
1104+
-- local func notify_producer will send to us some data
1105+
local msg = chan:get(opts.wait)
1106+
if msg then
1107+
xq.put_wait[xq:packkey(key)] = nil
1108+
chan:close()
1109+
return xq.retwrap(msg[1]), msg[2]
10571110
end
10581111
end
10591112
return xq.retwrap(t)
@@ -1244,7 +1297,7 @@ function methods:release(key, attr)
12441297
key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
12451298
end)
12461299

1247-
xq:putback(key,attr)
1300+
xq:putback(t)
12481301

12491302
return t
12501303
end
@@ -1302,7 +1355,7 @@ function methods:ack(key, attr)
13021355
end
13031356
end)
13041357

1305-
xq:putback(key) -- in real drop form taken key
1358+
xq:putback(t) -- in real drop form taken key
13061359

13071360
return t
13081361
end

0 commit comments

Comments
 (0)