1+ box .cfg {
2+ listen = ' 127.0.0.1:4221'
3+ }
4+ box .once (' access:v1' , function ()
5+ box .schema .user .grant (' guest' , ' read,write,execute' , ' universe' )
6+ end )
7+
8+ local function newmap (arg )
9+ arg = arg or {}
10+ local mt = getmetatable (arg )
11+ if mt then
12+ mt .__serialize = ' map'
13+ else
14+ setmetatable (arg , { __serialize = ' map' })
15+ end
16+ return arg
17+ end
18+
19+ require ' strict' .on ()
20+
21+ local format = {
22+ { name = " id" , type = " number" }, -- taskid
23+ { name = " status" , type = " string" }, -- status
24+ { name = " runat" , type = " number" }, -- runat
25+ { name = " payload" , type = " map" }, -- payload
26+ }
27+ local _ , e = box .schema .create_space (' myqueue' , {
28+ format = format ,
29+ if_not_exists = true
30+ })
31+
32+ if e ~= ' created' then
33+ box .space .myqueue :format (format )
34+ end
35+
36+ box .space .myqueue :create_index (' primary' , {
37+ parts = {" id" },
38+ if_not_exists = true ,
39+ })
40+ box .space .myqueue :create_index (' xq' , {
41+ parts = {" status" , " id" },
42+ if_not_exists = true ,
43+ })
44+ box .space .myqueue :create_index (' run' , {
45+ parts = {" runat" ," id" },
46+ if_not_exists = true ,
47+ })
48+
49+ if not package.path :match (' %.%./%?%.lua;' ) then
50+ package.path = ' ../?.lua;' .. package.path
51+ end
52+
53+ require ' xqueue' .upgrade (box .space .myqueue , {
54+ fields = {
55+ status = ' status' ;
56+ runat = ' runat'
57+ };
58+ features = {
59+ id = " time64" ,
60+ delayed = true ,
61+ ttr = 10 ,
62+ ttl = true ,
63+
64+ retval = ' tuple' ,
65+ };
66+ debug = true
67+ })
68+
69+ local test = require ' tap' .test (" putwait" )
70+ local fiber = require ' fiber'
71+
72+ fiber .create (function ()
73+ -- worker:
74+ fiber .yield ()
75+
76+ local task = box .space .myqueue :take (1 )
77+ test :ok (task , " task was taken #1" )
78+
79+ box .space .myqueue :release (task , {
80+ update = {
81+ { ' =' , 4 , newmap { was_taken = true } },
82+ }
83+ })
84+
85+ -- take task back
86+ task = box .space .myqueue :take (1 )
87+ test :ok (task , " task was taken #2" )
88+
89+ box .space .myqueue :update ({ task .id }, {
90+ { ' =' , 4 , newmap { processed = true } },
91+ })
92+
93+ -- Yeah! We've managed to process it:
94+ box .space .myqueue :ack (task )
95+ end )
96+
97+ -- producer:
98+ local task , processed = box .space .myqueue :put ({
99+ id = 1 , payload = { initiated = true }
100+ }, { wait = 3 })
101+
102+ test :ok (task , " task was returned for producer" )
103+ test :ok (processed , " task was processed" )
104+ test :is (box .space .myqueue :len (), 0 , " queue is empty" )
105+
106+ task , processed = box .space .myqueue :put ({
107+ id = 2 , payload = newmap (),
108+ }, { wait = 2 , ttl = 1 })
109+ test :ok (task , " task was returned for producer" )
110+ test :is (processed , false , " task wasn't processed because was killed by TTL" )
111+ test :is (box .space .myqueue :len (), 0 , " queue is empty" )
112+
113+ os.exit ()
0 commit comments