|
15 | 15 | from django.core.exceptions import ObjectDoesNotExist |
16 | 16 | from django.core.files.base import ContentFile |
17 | 17 | from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F |
| 18 | +from django.db import transaction |
18 | 19 | from django.utils.text import slugify |
19 | 20 | from django.utils.timezone import now |
20 | 21 | from rest_framework.exceptions import ValidationError |
@@ -141,14 +142,6 @@ def _send_to_compute_worker(submission, is_scoring): |
141 | 142 | submission = Submission.objects.get(id=submission.id) |
142 | 143 | task = submission.task |
143 | 144 |
|
144 | | - # priority of scoring tasks is higher, we don't want to wait around for |
145 | | - # many submissions to be scored while we're waiting for results |
146 | | - if is_scoring: |
147 | | - # higher numbers are higher priority |
148 | | - priority = 10 |
149 | | - else: |
150 | | - priority = 0 |
151 | | - |
152 | 145 | if not is_scoring: |
153 | 146 | run_args['prediction_result'] = make_url_sassy( |
154 | 147 | path=submission.prediction_result.name, |
@@ -201,40 +194,45 @@ def _send_to_compute_worker(submission, is_scoring): |
201 | 194 | time_padding = 60 * 20 # 20 minutes |
202 | 195 | time_limit = submission.phase.execution_time_limit + time_padding |
203 | 196 |
|
204 | | - if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue |
205 | | - submission.queue_name = submission.phase.competition.queue.name or '' |
206 | | - run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit |
207 | | - submission.save() |
208 | | - |
209 | | - # Send to special queue? Using `celery_app` var name here since we'd be overriding the imported `app` |
210 | | - # variable above |
211 | | - celery_app = app_or_default() |
212 | | - with celery_app.connection() as new_connection: |
213 | | - new_connection.virtual_host = str(submission.phase.competition.queue.vhost) |
214 | | - task = celery_app.send_task( |
| 197 | + def _enqueue_after_commit(): |
| 198 | + # priority of scoring tasks is higher, we don't want to wait around for |
| 199 | + # many submissions to be scored while we're waiting for results |
| 200 | + priority = 10 if is_scoring else 0 |
| 201 | + |
| 202 | + if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue |
| 203 | + submission.queue_name = submission.phase.competition.queue.name or '' |
| 204 | + run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit |
| 205 | + submission.save(update_fields=["queue_name"]) |
| 206 | + celery_app = app_or_default() |
| 207 | + with celery_app.connection() as new_connection: |
| 208 | + new_connection.virtual_host = str(submission.phase.competition.queue.vhost) |
| 209 | + task = celery_app.send_task( |
| 210 | + 'compute_worker_run', |
| 211 | + args=(run_args,), |
| 212 | + queue='compute-worker', |
| 213 | + soft_time_limit=time_limit, |
| 214 | + connection=new_connection, |
| 215 | + priority=priority, |
| 216 | + ) |
| 217 | + else: |
| 218 | + task = app.send_task( |
215 | 219 | 'compute_worker_run', |
216 | 220 | args=(run_args,), |
217 | 221 | queue='compute-worker', |
218 | 222 | soft_time_limit=time_limit, |
219 | | - connection=new_connection, |
220 | 223 | priority=priority, |
221 | 224 | ) |
222 | | - else: |
223 | | - task = app.send_task( |
224 | | - 'compute_worker_run', |
225 | | - args=(run_args,), |
226 | | - queue='compute-worker', |
227 | | - soft_time_limit=time_limit, |
228 | | - priority=priority, |
229 | | - ) |
230 | | - submission.celery_task_id = task.id |
231 | 225 |
|
232 | | - if submission.status == Submission.SUBMITTING: |
233 | | - # Don't want to mark an already-prepared submission as "submitted" again, so |
234 | | - # only do this if we were previously "SUBMITTING" |
235 | | - submission.status = Submission.SUBMITTED |
| 226 | + submission.celery_task_id = task.id |
| 227 | + |
| 228 | + if submission.status == Submission.SUBMITTING: |
| 229 | + # Don't want to mark an already-prepared submission as "submitted" again, so |
| 230 | + # only do this if we were previously "SUBMITTING" |
| 231 | + submission.status = Submission.SUBMITTED |
| 232 | + |
| 233 | + submission.save(update_fields=["celery_task_id", "status"]) |
236 | 234 |
|
237 | | - submission.save() |
| 235 | + transaction.on_commit(_enqueue_after_commit) |
238 | 236 |
|
239 | 237 |
|
240 | 238 | def create_detailed_output_file(detail_name, submission): |
|
0 commit comments