Skip to content

Commit b1da67b

Browse files
authored
Adds method :wait (#15)
Authored-by: Esiutkin Ivan <i.esiutkin@corp.mail.ru>
1 parent 2d78a64 commit b1da67b

2 files changed

Lines changed: 106 additions & 32 deletions

File tree

example-putwait/init.lua

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ require 'xqueue'.upgrade(box.space.myqueue, {
6969
local test = require 'tap'.test("putwait")
7070
local fiber = require 'fiber'
7171

72-
fiber.create(function()
72+
local function worker()
7373
-- worker:
7474
fiber.yield()
7575

@@ -92,22 +92,43 @@ fiber.create(function()
9292

9393
-- Yeah! We've managed to process it:
9494
box.space.myqueue:ack(task)
95-
end)
95+
end
96+
97+
do
98+
-- consumer:
99+
fiber.create(worker)
100+
-- producer:
101+
local task, processed = box.space.myqueue:put({
102+
id = 1, payload = { initiated = true }
103+
}, { wait = 3 })
104+
105+
test:ok(task, "task was returned for producer")
106+
test:ok(processed, "task was processed")
107+
test:is(box.space.myqueue:len(), 0, "queue is empty")
108+
109+
task, processed = box.space.myqueue:put({
110+
id = 2, payload = newmap(),
111+
}, { wait = 2, ttl = 1 })
112+
test:ok(task, "task was returned for producer")
113+
test:is(processed, false, "task wasn't processed because was killed by TTL")
114+
test:is(box.space.myqueue:len(), 0, "queue is empty")
115+
end
116+
117+
do
118+
-- consumer:
119+
fiber.create(worker)
120+
-- producer:
121+
local task = box.space.myqueue:put({
122+
id = 3, payload = { initiated = true }
123+
})
124+
125+
local processed
126+
task, processed = box.space.myqueue:wait(task, 3)
127+
test:ok(task, "task was returned for producer")
128+
test:ok(processed, "task was processed")
129+
130+
test:is(box.space.myqueue:len(), 0, "queue is empty")
131+
end
96132

97-
-- producer:
98-
local task, processed = box.space.myqueue:put({
99-
id = 1, payload = { initiated = true }
100-
}, { wait = 3 })
101-
102-
test:ok(task, "task was returned for producer")
103-
test:ok(processed, "task was processed")
104-
test:is(box.space.myqueue:len(), 0, "queue is empty")
105-
106-
task, processed = box.space.myqueue:put({
107-
id = 2, payload = newmap(),
108-
}, { wait = 2, ttl = 1 })
109-
test:ok(task, "task was returned for producer")
110-
test:is(processed, false, "task wasn't processed because was killed by TTL")
111-
test:is(box.space.myqueue:len(), 0, "queue is empty")
112133

113134
os.exit()

xqueue.lua

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -657,31 +657,40 @@ function M.upgrade(space,opts,depth)
657657
-- Producer should not be notified if queue is able to process task in future.
658658
local function notify_producer(key, task)
659659
local pkey = self:packkey(key)
660-
local chan = self.put_wait[pkey]
661-
if not chan or not chan:has_readers() then
660+
local wait = self.put_wait[pkey]
661+
if not wait then
662662
-- no producer
663663
return
664664
end
665+
665666
if task.status == 'Z' or task.status == 'D' then
666667
-- task was processed
667-
chan:put({ task, true }, 0)
668+
wait.task = task
669+
wait.processed = true
670+
wait.cond:broadcast()
668671
return
669672
end
670673
if not space:get{key} then
671674
-- task is not present in space
672675
-- it could be TTL or :ack
673676
if task.status == 'T' then
674677
-- task was acked:
675-
chan:put({ task, true }, 0)
678+
wait.task = task
679+
wait.processed = true
680+
wait.cond:broadcast()
676681
elseif task.status == 'R' then
677682
-- task was killed by TTL
678-
chan:put({ task, false }, 0)
683+
wait.task = task
684+
wait.processed = false
685+
wait.cond:broadcast()
679686
end
680687
else
681688
-- task is still in space
682689
if task.status == 'B' then
683690
-- task was buried
684-
chan:put({ task, false }, 0)
691+
wait.task = task
692+
wait.processed = false
693+
wait.cond:broadcast()
685694
end
686695
end
687696
end
@@ -1098,29 +1107,73 @@ function methods:put(t, opts)
10981107
local tuple = xq.tuple(t)
10991108
local key = tuple[ xq.key.no ]
11001109

1101-
local chan
1110+
local wait
11021111
if opts.wait then
1103-
chan = fiber.channel(1)
1104-
xq.put_wait[xq:packkey(key)] = chan
1112+
wait = { cond = fiber.cond() }
1113+
xq.put_wait[xq:packkey(key)] = wait
11051114
end
11061115

11071116
xq:atomic(key, function()
11081117
t = self:insert(tuple)
11091118
end)
11101119
xq:wakeup(t)
11111120

1112-
if chan then
1121+
if wait then
11131122
-- local func notify_producer will send to us some data
1114-
local msg = chan:get(opts.wait)
1115-
if msg then
1116-
xq.put_wait[xq:packkey(key)] = nil
1117-
chan:close()
1118-
return xq.retwrap(msg[1]), msg[2]
1123+
local ok = wait.cond:wait(opts.wait)
1124+
fiber.testcancel()
1125+
if ok and wait.task then
1126+
return xq.retwrap(wait.task), wait.processed
11191127
end
11201128
end
11211129
return xq.retwrap(t)
11221130
end
11231131

1132+
--[[
1133+
* `space:wait(id, timeout)`
1134+
- `id`:
1135+
+ `string` | `number` - primary key
1136+
- `timeout` - number of seconds to wait
1137+
* callee fiber will be blocked up to `timeout` seconds until task won't
1138+
be processed or timeout reached.
1139+
]]
1140+
1141+
local wait_for = {
1142+
R = true,
1143+
T = true,
1144+
W = true,
1145+
}
1146+
1147+
function methods:wait(key, timeout)
1148+
local xq = self.xq
1149+
key = xq:getkey(key)
1150+
local task = self:get(key)
1151+
if not task then
1152+
error(("Task {%s} was not found"):format(key))
1153+
end
1154+
1155+
local status = task[xq.fields.status]
1156+
if not wait_for[status] then
1157+
return xq.retwrap(task), false
1158+
end
1159+
1160+
local pkey = xq:packkey(key)
1161+
local wait = xq.put_wait[pkey]
1162+
if not wait then
1163+
wait = { cond = fiber.cond() }
1164+
xq.put_wait[pkey] = wait
1165+
end
1166+
1167+
-- local func notify_producer will send to us some data
1168+
local ok = wait.cond:wait(timeout)
1169+
fiber.testcancel()
1170+
if ok and wait.task then
1171+
return xq.retwrap(wait.task), wait.processed
1172+
end
1173+
1174+
return xq.retwrap(task), false
1175+
end
1176+
11241177
--[[
11251178
* `space:take(timeout)`
11261179
* `space:take(timeout, opts)`

0 commit comments

Comments
 (0)