@@ -11,119 +11,119 @@ require 'schema'
1111
1212--- @type Queue
1313local q = require ' queue' {
14- take_timeout = 1 ,
15- space = ' queue' ,
16-
17- zombie = true ,
18- zombie_delay = 3600 ,
19-
20- backoff = {
21- delay_base = 1.5 ,
22- max_delay = 15 ,
23- max_attempt = 3 ,
24- },
25-
26- workers = 0 ,
27- tubes = {
28- -- { tube = 'parse', workers = 4 },
29- -- { tube = 'download', workers = 2 },
30- },
14+ take_timeout = 1 ,
15+ space = ' queue' ,
16+
17+ zombie = true ,
18+ zombie_delay = 3600 ,
19+
20+ backoff = {
21+ delay_base = 1.5 ,
22+ max_delay = 15 ,
23+ max_attempt = 3 ,
24+ },
25+
26+ workers = 0 ,
27+ tubes = {
28+ -- { tube = 'parse', workers = 4 },
29+ -- { tube = 'download', workers = 2 },
30+ },
3131}
3232
3333function q :on_fail (err )
3434
3535end
3636
3737q :on_take (' parse' , function (task )
38- local body = task .payload .body
39- if type (body ) ~= ' string' then
40- task :bury (" body is not a string" )
41- return false
42- end
43-
44- if # body > 2 ^ 20 then
45- q :ack (task , { processed = false }, { keep = false , delay = 1 })
46- return false
47- end
48- return task
38+ local body = task .payload .body
39+ if type (body ) ~= ' string' then
40+ task :bury (" body is not a string" )
41+ return false
42+ end
43+
44+ if # body > 2 ^ 20 then
45+ q :ack (task , { processed = false }, { keep = false , delay = 1 })
46+ return false
47+ end
48+ return task
4949end )
5050
5151q :handle (' parse' , function (task )
52- log .info (" Start processing %s" , task )
53-
54- local body = task .payload .body
55- --- @cast body string
56-
57- --- @type string[]
58- local urls = {}
59- for href in body :gmatch ([[ href="(https://[^"]+)"]] ) do
60- local u = uri .parse (href )
61- if type (u ) == ' table' and (u .scheme == ' http' or u .scheme == ' https' ) and u .host :match (" wikipedia.org" ) then
62- local link = uri .format (u )
63- if link then
64- table.insert (urls , link )
65- end
66- end
67- end
68-
69- -- uniquealize urls:
70- --- @type string[]
71- urls = fun .totable (fun .zip (urls , fun .ones ()):tomap ())
72-
73- log .info (" From url %s discovered %d links" , task .dedup , # urls )
74-
75- box .atomic (function ()
76- for _ ,link in pairs (urls ) do
77- q :put ({ dedup = link , tube = ' download' , kind = ' download' , payload = {url = link , timeout = 3 } }, {skip_if_exists = true })
78- end
79- task .payload .body = ' ...'
80- task :ack ({ processed_at = os.time (), links = urls }, { update = { payload = task .payload } })
81- end )
52+ log .info (" Start processing %s" , task )
53+
54+ local body = task .payload .body
55+ --- @cast body string
56+
57+ --- @type string[]
58+ local urls = {}
59+ for href in body :gmatch ([[ href="(https://[^"]+)"]] ) do
60+ local u = uri .parse (href )
61+ if type (u ) == ' table' and (u .scheme == ' http' or u .scheme == ' https' ) and u .host :match (" wikipedia.org" ) then
62+ local link = uri .format (u )
63+ if link then
64+ table.insert (urls , link )
65+ end
66+ end
67+ end
68+
69+ -- uniquealize urls:
70+ --- @type string[]
71+ urls = fun .totable (fun .zip (urls , fun .ones ()):tomap ())
72+
73+ log .info (" From url %s discovered %d links" , task .dedup , # urls )
74+
75+ box .atomic (function ()
76+ for _ ,link in pairs (urls ) do
77+ q :put ({ dedup = link , tube = ' download' , kind = ' download' , payload = {url = link , timeout = 3 } }, {skip_if_exists = true })
78+ end
79+ task .payload .body = ' ...'
80+ task :ack ({ processed_at = os.time (), links = urls }, { update = { payload = task .payload } })
81+ end )
8282end )
8383
8484q :on_take (' download' , function (task )
85- local url = task .payload .url
86- if type (url ) ~= ' string' then
87- task :bury (" malformed url" )
88- return false
89- end
90-
91- if not url :match (" wikipedia%.org" ) then
92- task :bury (" outside wikipedia.org" )
93- return false
94- end
85+ local url = task .payload .url
86+ if type (url ) ~= ' string' then
87+ task :bury (" malformed url" )
88+ return false
89+ end
90+
91+ if not url :match (" wikipedia%.org" ) then
92+ task :bury (" outside wikipedia.org" )
93+ return false
94+ end
9595end )
9696
9797q :handle (' download' , function (task )
98- -- log.info("Start processing %s", task)
99-
100- local url = task .payload .url
101- local timeout = task .payload .timeout
102- --- @cast url string
103- --- @cast timeout number
104-
105- local result = http .get (url , { timeout = timeout })
106- if result .status ~= 200 then
107- return task :backoff ((" http not 200: %s:%s" ):format (result .status , result .body :sub (1 , 1024 )))
108- end
109-
110- if # result .body > 2 ^ 20 then
111- return task :ack ({ status = 200 , processed_at = os.time (), reason = " Body is too long" })
112- end
113-
114- box .atomic (function ()
115- q :put ({ dedup = url , tube = ' parse' , kind = ' parse' , payload = { body = result .body } }, { skip_if_exists = true })
116- task :ack ({ status = 200 , processed_at = os.time () })
117- end )
98+ -- log.info("Start processing %s", task)
99+
100+ local url = task .payload .url
101+ local timeout = task .payload .timeout
102+ --- @cast url string
103+ --- @cast timeout number
104+
105+ local result = http .get (url , { timeout = timeout })
106+ if result .status ~= 200 then
107+ return task :backoff ((" http not 200: %s:%s" ):format (result .status , result .body :sub (1 , 1024 )))
108+ end
109+
110+ if # result .body > 2 ^ 20 then
111+ return task :ack ({ status = 200 , processed_at = os.time (), reason = " Body is too long" })
112+ end
113+
114+ box .atomic (function ()
115+ q :put ({ dedup = url , tube = ' parse' , kind = ' parse' , payload = { body = result .body } }, { skip_if_exists = true })
116+ task :ack ({ status = 200 , processed_at = os.time () })
117+ end )
118118end )
119119
120120q :put ({
121- kind = ' download' ,
122- tube = ' download' ,
123- dedup = ' https://en.wikipedia.org/wiki/Comparison_sort' ,
124- payload = { url = ' https://en.wikipedia.org/wiki/Comparison_sort' , timeout = 10 },
121+ kind = ' download' ,
122+ tube = ' download' ,
123+ dedup = ' https://en.wikipedia.org/wiki/Comparison_sort' ,
124+ payload = { url = ' https://en.wikipedia.org/wiki/Comparison_sort' , timeout = 10 },
125125}, {
126- restart_if_exists = true ,
126+ restart_if_exists = true ,
127127})
128128
129129rawset (_G , ' queue' , q )
0 commit comments