Skip to content

Commit 190ba3f

Browse files
committed
Provides per tube consumer channels and put+wait draft
1 parent 27126e7 commit 190ba3f

2 files changed

Lines changed: 129 additions & 23 deletions

File tree

example-tube/init.lua

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
box.cfg{
2-
-- wal_mode = 'none',
3-
}
1+
box.cfg{}
42

53
require 'strict'.on()
64

@@ -17,7 +15,6 @@ local format = {
1715
-- utube stands for unordered tubes
1816
box.schema.create_space('utube', {
1917
format = format,
20-
temporary = true,
2118
if_not_exists = true,
2219
})
2320

@@ -38,6 +35,8 @@ box.space.utube:create_index('run', {
3835
if_not_exists = true,
3936
})
4037

38+
box.space.utube:truncate()
39+
4140
do
4241
local fio = require 'fio'
4342
local root = fio.abspath(debug.getinfo(1).source:gsub("@","")):gsub("[^/]+$", "")
@@ -57,21 +56,24 @@ require 'xqueue'.upgrade(box.space.utube, {
5756
id = 'time64',
5857
delayed = true,
5958
buried = true,
60-
zombie = 60,
6159

6260
retval = 'tuple',
6361
},
6462
})
6563

64+
local fiber = require 'fiber'
65+
local test = require 'tap'.test("tubes")
66+
6667
for i = 1, 10 do
67-
box.space.utube:put({
68+
local task = box.space.utube:put({
6869
tube = "tube-" .. i,
6970
payload = {
7071
id = i,
7172
ctime = require 'clock'.realtime64(),
7273
},
7374
nice = i,
7475
})
76+
test:ok(task, "task was returned by :put")
7577
end
7678

7779
for i = 10, 1, -1 do
@@ -80,22 +82,82 @@ for i = 10, 1, -1 do
8082
tube = tube,
8183
})
8284

83-
assert(task, "task not taken from tube: " .. tube)
84-
assert(task.tube == tube, "task must be taken from specific tube")
85+
test:ok(task, "task was taken from tube: " .. tube)
86+
test:is(task.tube, tube, "task is from the same tube consumer requested")
8587

8688
local notask = box.space.utube:take({
8789
tube = tube,
8890
timeout = 0.5,
8991
})
9092

91-
assert(not notask, "task was not taken because tube is empty")
93+
test:isnil(notask, "no task because tube " .. tube .. " is empty")
9294

9395
if i % 2 == 0 then
94-
box.space.utube:ack(task)
96+
local ret = box.space.utube:ack(task)
97+
test:ok(ret, ":ack returned processed task")
98+
test:is(ret.id, task.id, ":ack returned same task with same task.id")
9599
else
96-
box.space.utube:release(task)
100+
local ret = box.space.utube:release(task)
101+
test:ok(ret, ":release returned processed task")
102+
test:is(ret.id, task.id, ":release returned same task with same task.id")
103+
test:is(ret.status, 'R', ":release retutned task in status 'R'")
104+
end
105+
end
106+
107+
do
108+
-- common worker:
109+
for i = 1, box.space.utube:len() do
110+
local task = box.space.utube:take(0)
111+
test:ok(task, ":take returned task from queue")
112+
local tube = 'tube-' .. (2*i-1)
113+
test:is(task.tube, tube, "task must be from tube: " .. tube)
114+
115+
local ret = box.space.utube:release(task, { delay = 1 })
116+
test:is(ret.id, task.id, ":release successfully returned task back to queue")
117+
test:is(ret.status, "W", "task was returned to status 'W' because was released with delay")
118+
end
119+
end
120+
121+
-- clears queue:
122+
box.space.utube:truncate()
123+
124+
-- Let's check how take+put works:
125+
do
126+
local first = fiber.channel(1)
127+
local second = fiber.channel(1)
128+
129+
local tubename = 'tube'
130+
131+
fiber.create(function()
132+
-- creating this fiber we initiated new session
133+
-- so only this fiber takes ownership for this task
134+
second:put(box.space.utube:take(1), 0)
135+
end)
136+
137+
fiber.create(function()
138+
first:put(box.space.utube:take({
139+
tube = tubename,
140+
timeout = 1,
141+
}), 0)
142+
end)
143+
144+
fiber.sleep(0.1)
145+
146+
for i = 1, 2 do
147+
box.space.utube:put({
148+
tube = tubename,
149+
payload = { id = i },
150+
nice = 512,
151+
})
97152
end
153+
154+
local task1 = first:get(0.1)
155+
test:ok(task1, "tube consumer must receive task")
156+
test:is(task1.payload.id, 1, "tube consumer must receive task first")
157+
158+
local task2 = second:get(0.1)
159+
test:ok(task2, "common consumer must receive task")
160+
test:is(task2.payload.id, 2, "common consumer must receive task second")
98161
end
99162

100-
require 'console'.start()
101163
os.exit()

xqueue.lua

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ function M.upgrade(space,opts,depth)
245245
self.bysid = space.xq.bysid
246246
self._lock = space.xq._lock
247247
self.take_wait = space.xq.take_wait
248+
self.take_chans = space.xq.take_chans or {}
248249
self._on_repl = space.xq._on_repl
249250
self._on_dis = space.xq._on_dis
250251
else
@@ -253,6 +254,7 @@ function M.upgrade(space,opts,depth)
253254
-- byfid = {};
254255
self._lock = {}
255256
self.take_wait = fiber.channel(0)
257+
self.take_chans = setmetatable({}, { __mode = 'v' })
256258
end
257259
setmetatable(self.bysid, {
258260
__serialize='map',
@@ -808,6 +810,15 @@ function M.upgrade(space,opts,depth)
808810

809811
function self:wakeup(t)
810812
if t[self.fieldmap.status] ~= 'R' then return end
813+
if self.fieldmap.tube then
814+
-- we may have consumers in the tubes:
815+
local tube_chan = self.take_chans[t[self.fieldmap.tube]]
816+
if tube_chan and tube_chan:has_readers() and tube_chan:put(true, 0) then
817+
-- we have successfully notified consumer
818+
return
819+
end
820+
-- otherwise fallback to default channel:
821+
end
811822
if self.take_wait:has_readers() then
812823
self.take_wait:put(true,0)
813824
end
@@ -953,6 +964,9 @@ end
953964
* if set, task will become `W` instead of `R` for `delay` seconds
954965
+ `ttl` - number of seconds
955966
* if set, task will be discarded after ttl seconds unless was taken
967+
+ `wait` - number of seconds
968+
* if set, callee fiber will be blocked up to `wait` seconds until task won't
969+
be processed or timeout reached.
956970
957971
```lua
958972
box.space.myqueue:put{ name="xxx"; data="yyy"; }
@@ -1002,6 +1016,10 @@ function methods:put(t, opts)
10021016
end
10031017
t[ xq.fieldmap.status ] = 'W'
10041018
t[ xq.fieldmap.runat ] = xq.timeoffset(opts.delay)
1019+
1020+
if opts.wait then
1021+
error("Are you crazy? Call of :put({...}, { wait = <>, delay = <> }) looks weird", 2)
1022+
end
10051023
elseif opts.ttl then
10061024
if not xq.features.ttl then
10071025
error("Feature ttl is not enabled",2)
@@ -1021,11 +1039,23 @@ function methods:put(t, opts)
10211039
local tuple = xq.tuple(t)
10221040
local key = tuple[ xq.key.no ]
10231041

1024-
xq._lock[ key ] = true
1025-
t = self:insert( tuple )
1026-
xq._lock[ key ] = nil
1042+
local chan
1043+
if opts.wait then
1044+
chan = fiber.channel(1)
1045+
xq.put_wait[key] = chan
1046+
end
10271047

1048+
xq:atomic(key, function()
1049+
t = self:insert(tuple)
1050+
end)
10281051
xq:wakeup(t)
1052+
1053+
if chan then
1054+
local res = chan:get(opts.wait)
1055+
if res then
1056+
return xq.retwrap(res), true
1057+
end
1058+
end
10291059
return xq.retwrap(t)
10301060
end
10311061

@@ -1036,6 +1066,7 @@ end
10361066
- `timeout` - number of seconds to wait for new task
10371067
+ choose reasonable time
10381068
+ beware of **readahead** size (see tarantool docs)
1069+
- `tube` - name of the tube worker wants to take task from (feature tube must be enabled)
10391070
- returns task tuple or table (see retval) or nothing on timeout
10401071
- *TODO*: ttr must be there
10411072
]]
@@ -1065,12 +1096,18 @@ function methods:take(timeout, opts)
10651096
local index
10661097
local start_with
10671098

1099+
local tube_chan
10681100
if opts.tube then
10691101
if not xq.features.tube then
10701102
error("Feature tube is not enabled", 2)
10711103
end
1104+
1105+
assert(type(opts.tube) == 'string', "opts.tube must be a string")
1106+
10721107
index = xq.tube_index
10731108
start_with = {opts.tube, 'R'}
1109+
tube_chan = xq.take_chans[opts.tube] or fiber.channel()
1110+
xq.take_chans[opts.tube] = tube_chan
10741111
else
10751112
index = xq.index
10761113
start_with = {'R'}
@@ -1082,9 +1119,7 @@ function methods:take(timeout, opts)
10821119
while not found do
10831120
for _,t in index:pairs(start_with, { iterator = box.index.EQ }) do
10841121
key = t[ xq.key.no ]
1085-
if xq._lock[ key ] then
1086-
-- continue
1087-
else
1122+
if not xq._lock[ key ] then
10881123
-- found key
10891124
xq._lock[ key ] = true
10901125
found = t
@@ -1093,13 +1128,22 @@ function methods:take(timeout, opts)
10931128
end
10941129
if not found then
10951130
local left = (now + timeout) - fiber.time()
1096-
if left <= 0 then return end
1097-
xq.take_wait:get(left)
1098-
if box.session.storage.destroyed then return end
1099-
else
1100-
break
1131+
if left <= 0 then goto finish end
1132+
1133+
(tube_chan or xq.take_wait):get(left)
1134+
if box.session.storage.destroyed then goto finish end
11011135
end
11021136
end
1137+
::finish::
1138+
1139+
-- If we were last reader from the tube
1140+
-- we remove channel. Writers will get false
1141+
-- on :put if they exists.
1142+
if tube_chan and not tube_chan:has_readers() then
1143+
tube_chan:close()
1144+
xq.take_chans[opts.tube] = nil
1145+
end
1146+
if not found then return end
11031147

11041148
local r,e = pcall(function()
11051149
local sid = box.session.id()

0 commit comments

Comments
 (0)