Skip to content

Commit 8417048

Browse files
committed
example: internal queue
1 parent c18f9bb commit 8417048

4 files changed

Lines changed: 744 additions & 0 deletions

File tree

example-internal-queue/init.lua

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/usr/bin/env tarantool
2+
local log = require 'log'
3+
local http = require 'http.client'
4+
local fun = require 'fun'
5+
local uri = require 'uri'
6+
7+
box.cfg { listen = 3301, memtx_memory = 2^31 }
8+
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
9+
10+
require 'schema'
11+
12+
---@type Queue
13+
local q = require 'queue' {
14+
take_timeout = 1,
15+
space = 'queue',
16+
17+
zombie = true,
18+
zombie_delay = 3600,
19+
20+
backoff = {
21+
delay_base = 1.5,
22+
max_delay = 15,
23+
max_attempt = 3,
24+
},
25+
26+
workers = 0,
27+
tubes = {
28+
-- { tube = 'parse', workers = 4 },
29+
-- { tube = 'download', workers = 2 },
30+
},
31+
}
32+
33+
function q:on_fail(err)
34+
35+
end
36+
37+
q:on_take('parse', function(task)
38+
local body = task.payload.body
39+
if type(body) ~= 'string' then
40+
task:bury("body is not a string")
41+
return false
42+
end
43+
44+
if #body > 2^20 then
45+
q:ack(task, { processed = false }, { keep = false, delay = 1 })
46+
return false
47+
end
48+
return task
49+
end)
50+
51+
q:handle('parse', function (task)
52+
log.info("Start processing %s", task)
53+
54+
local body = task.payload.body
55+
---@cast body string
56+
57+
---@type string[]
58+
local urls = {}
59+
for href in body:gmatch([[href="(https://[^"]+)"]]) do
60+
local u = uri.parse(href)
61+
if type(u) == 'table' and (u.scheme == 'http' or u.scheme == 'https') and u.host:match("wikipedia.org") then
62+
local link = uri.format(u)
63+
if link then
64+
table.insert(urls, link)
65+
end
66+
end
67+
end
68+
69+
-- uniquealize urls:
70+
---@type string[]
71+
urls = fun.totable(fun.zip(urls, fun.ones()):tomap())
72+
73+
log.info("From url %s discovered %d links", task.dedup, #urls)
74+
75+
box.atomic(function()
76+
for _,link in pairs(urls) do
77+
q:put({ dedup=link, tube='download', kind='download', payload={url=link, timeout=3} }, {skip_if_exists = true})
78+
end
79+
task.payload.body = '...'
80+
task:ack({ processed_at = os.time(), links = urls }, { update = { payload = task.payload } })
81+
end)
82+
end)
83+
84+
q:on_take('download', function(task)
85+
local url = task.payload.url
86+
if type(url) ~= 'string' then
87+
task:bury("malformed url")
88+
return false
89+
end
90+
91+
if not url:match("wikipedia%.org") then
92+
task:bury("outside wikipedia.org")
93+
return false
94+
end
95+
end)
96+
97+
q:handle('download', function(task)
98+
-- log.info("Start processing %s", task)
99+
100+
local url = task.payload.url
101+
local timeout = task.payload.timeout
102+
---@cast url string
103+
---@cast timeout number
104+
105+
local result = http.get(url, { timeout = timeout })
106+
if result.status ~= 200 then
107+
return task:backoff(("http not 200: %s:%s"):format(result.status, result.body:sub(1, 1024)))
108+
end
109+
110+
if #result.body > 2^20 then
111+
return task:ack({ status = 200, processed_at = os.time(), reason = "Body is too long" })
112+
end
113+
114+
box.atomic(function()
115+
q:put({ dedup = url, tube = 'parse', kind = 'parse', payload = { body = result.body } }, { skip_if_exists = true })
116+
task:ack({ status = 200, processed_at = os.time() })
117+
end)
118+
end)
119+
120+
q:put({
121+
kind = 'download',
122+
tube = 'download',
123+
dedup = 'https://en.wikipedia.org/wiki/Comparison_sort',
124+
payload = { url = 'https://en.wikipedia.org/wiki/Comparison_sort', timeout = 10 },
125+
}, {
126+
restart_if_exists = true,
127+
})
128+
129+
rawset(_G, 'queue', q)
130+
require 'console'.start()
131+
os.exit(0)

0 commit comments

Comments
 (0)