Skip to content

Commit d2a80c4

Browse files
artikzstefano-maggiolo
authored andcommitted
Fix first sweeper run
1 parent 4ceb644 commit d2a80c4

6 files changed

Lines changed: 21 additions & 31 deletions

File tree

cms/io/triggeredservice.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,10 @@ def __init__(self, shard):
212212

213213
self._executors = []
214214

215-
# Set up and spawn the sweeper.
216-
#
217-
# TODO: link to greenlet and react to its death.
218215
self._sweeper_start = None
219216
self._sweeper_event = Event()
220-
gevent.spawn(self._sweeper_loop)
217+
self._sweeper_started = False
218+
self._sweeper_timeout = None
221219

222220
def add_executor(self, executor):
223221
"""Add an executor for the service.
@@ -264,21 +262,27 @@ def dequeue(self, operation):
264262
for executor in self._executors:
265263
executor.dequeue(operation)
266264

267-
def _sweeper_timeout(self):
268-
"""Return how frequently to run the sweeper loop.
265+
def start_sweeper(self, timeout):
266+
"""Start sweeper loop with given timeout.
269267
270-
return (float|None): timeout in seconds, or None for no
271-
sweeping.
268+
timeout (float): timeout in seconds.
272269
273270
"""
274-
return None
271+
if not self._sweeper_started:
272+
self._sweeper_started = True
273+
self._sweeper_timeout = timeout
274+
275+
# TODO: link to greenlet and react to its death.
276+
gevent.spawn(self._sweeper_loop)
277+
else:
278+
logger.warning("Service tried to start the sweeper loop twice.")
275279

276280
def _sweeper_loop(self):
277281
"""Regularly check for missed operations.
278282
279-
Run the sweep once every _sweeper_timeout() seconds but make
283+
Run the sweep once every _sweeper_timeout seconds but make
280284
sure that no two sweeps run simultaneously. That is, start a
281-
new sweep _sweeper_timeout() seconds after the previous one
285+
new sweep _sweeper_timeout seconds after the previous one
282286
started or when the previous one finished, whatever comes
283287
last.
284288
@@ -291,10 +295,6 @@ def _sweeper_loop(self):
291295
suppressed, because the loop must go on.
292296
293297
"""
294-
# If the timeout is None, it means the subclass does not want
295-
# a sweeper.
296-
if self._sweeper_timeout() is None:
297-
return
298298
while True:
299299
self._sweeper_start = monotonic_time()
300300
self._sweeper_event.clear()
@@ -306,7 +306,7 @@ def _sweeper_loop(self):
306306
"operations.", exc_info=True)
307307

308308
self._sweeper_event.wait(max(self._sweeper_start +
309-
self._sweeper_timeout() -
309+
self._sweeper_timeout -
310310
monotonic_time(), 0))
311311

312312
def _sweep(self):

cms/service/EvaluationService.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ def __init__(self, shard, contest_id):
847847
ServiceCoord("ScoringService", 0))
848848

849849
self.add_executor(EvaluationExecutor(self))
850+
self.start_sweeper(117.0)
850851

851852
self.add_timeout(self.check_workers_timeout, None,
852853
EvaluationService.WORKER_TIMEOUT_CHECK_TIME
@@ -857,9 +858,6 @@ def __init__(self, shard, contest_id):
857858
.total_seconds(),
858859
immediately=False)
859860

860-
def _sweeper_timeout(self):
861-
return 117.0
862-
863861
def submission_enqueue_operations(self, submission, check_again=False):
864862
"""Push in queue the operations required by a submission.
865863

cms/service/PrintingService.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,13 @@ def __init__(self, shard):
231231
self.file_cacher = FileCacher(self)
232232

233233
self.add_executor(PrintingExecutor(self.file_cacher))
234+
self.start_sweeper(61.0)
234235

235236
if config.printer is None:
236237
logger.info("Printing is disabled, so the PrintingService is "
237238
"idle.")
238239
return
239240

240-
def _sweeper_timeout(self):
241-
return 61.0
242-
243241
def _missing_operations(self):
244242
"""Enqueue unprinted print jobs.
245243

cms/service/ProxyService.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,11 @@ def __init__(self, shard, contest_id):
261261
self.rankings = list()
262262
for ranking in config.rankings:
263263
self.add_executor(ProxyExecutor(ranking.encode('utf-8')))
264+
self.start_sweeper(347.0)
264265

265266
# Send some initial data to rankings.
266267
self.initialize()
267268

268-
def _sweeper_timeout(self):
269-
return 347.0
270-
271269
def _missing_operations(self):
272270
"""Return a generator of data to be sent to the rankings..
273271

cms/service/ScoringService.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,7 @@ def __init__(self, shard):
153153
must_be_present=ranking_enabled)
154154

155155
self.add_executor(ScoringExecutor(self.proxy_service))
156-
157-
def _sweeper_timeout(self):
158-
return 347.0
156+
self.start_sweeper(347.0)
159157

160158
def _missing_operations(self):
161159
"""Return a generator of unscored submission results.

cmstestsuite/unit_tests/io/triggeredservice_test.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,6 @@ def __init__(self, shard, timeout):
8686
def add_missing_operation(self, operation):
8787
self._operations.append(operation)
8888

89-
def _sweeper_timeout(self):
90-
return self._timeout
91-
9289
def _missing_operations(self):
9390
counter = 0
9491
while self._operations != []:
@@ -114,6 +111,7 @@ def setUpService(self, timeout=None, batch=False):
114111
self.service = FakeTriggeredService(0, timeout)
115112
for notifier in self.notifiers:
116113
self.service.add_executor(FakeExecutor(notifier))
114+
self.service.start_sweeper(timeout)
117115

118116
def test_success(self):
119117
"""Test a simple success case."""

0 commit comments

Comments
 (0)