Description
Generated with claude, wanted to open an issue to have some discussion first on this proposed fix before opening a PR.
When a scheduler dies in the window between sending a task to Celery and processing the resulting QUEUED executor event, external_executor_id is never written to the task_instance table. When a new scheduler takes over and calls adopt_or_reset_orphaned_tasks, CeleryExecutor.try_adopt_task_instances cannot adopt the task (it requires a non-None
external_executor_id to construct the AsyncResult for Celery state lookup), so the task is reset and re-queued instead.
Use case/motivation
Root cause
external_executor_id is only set as a side effect of handle_executor_events when it processes a QUEUED or RUNNING state event:
scheduler_job_runner.py
if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
ti.external_executor_id = info
The Celery task_id (result.task_id) is available immediately after apply_async() returns in _send_workloads(), but it is only stored in the in-memory event_buffer:
celery_executor.py
self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
If the scheduler process dies before the next handle_executor_events cycle flushes that value to the DB, the task_id is lost. The new scheduler's executor starts with an empty workloads dict and event buffer, so there is no way to recover it.
try_adopt_task_instances explicitly skips tasks without a task_id (with a comment acknowledging this limitation):
celery_executor.py
for ti in tis:
if ti.external_executor_id is not None:
celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti)
else:
not_adopted_tis.append(ti) # cannot adopt — no Celery task_id to look up
Observed behavior
A task instance in running state owned by a dead scheduler job is reset instead of adopted, triggering a duplicate execution. The scheduler logs show:
Reset the following 1 orphaned TaskInstances:
<TaskInstance: dag.task scheduled__... [running]>
with no prior "Setting external_executor_id" log for the task's first try.
Expected behavior
The Celery task_id should survive a scheduler restart. The new scheduler should be able to adopt the in-flight task and poll its Celery state rather than resetting it.
Proposed fix
Persist external_executor_id directly to the database in _send_workloads() immediately after a successful apply_async(), rather than waiting for it to be written as a side effect of event processing:
celery_executor.py — _send_workloads()
elif result is not None:
result.backend = cached_celery_backend
self.running.add(key)
self.workloads[key] = result
self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
# Persist the Celery task_id immediately so it survives a scheduler restart.
# Currently this is only written as a side effect of handle_executor_events,
# which creates a window where a crash leaves external_executor_id as None
# and prevents adoption by a new scheduler.
self._persist_external_executor_id(key, result.task_id, session)
This eliminates the race window between task submission and event processing. The event_buffer path can remain as-is for the normal (no-crash) flow; the DB write is an additional safety net.
Notes
- The existing code comment in try_adopt_task_instances already acknowledges this race condition as a known limitation. This fix resolves it.
- A session would need to be threaded into _send_workloads or the write issued via a short-lived session, depending on how the call chain is structured.
- The handle_executor_events path that sets external_executor_id from QUEUED/RUNNING events should be kept as a reconciliation path for any case where the DB write is missed.
Related issues
#37784
Are you willing to submit a PR?
Code of Conduct
Description
Generated with claude, wanted to open an issue to have some discussion first on this proposed fix before opening a PR.
When a scheduler dies in the window between sending a task to Celery and processing the resulting
QUEUEDexecutor event,external_executor_idis never written to thetask_instancetable. When a new scheduler takes over and callsadopt_or_reset_orphaned_tasks,CeleryExecutor.try_adopt_task_instancescannot adopt the task (it requires a non-Noneexternal_executor_idto construct the AsyncResult for Celery state lookup), so the task is reset and re-queued instead.Use case/motivation
Root cause
external_executor_id is only set as a side effect of handle_executor_events when it processes a QUEUED or RUNNING state event:
scheduler_job_runner.py
The Celery task_id (result.task_id) is available immediately after apply_async() returns in _send_workloads(), but it is only stored in the in-memory event_buffer:
celery_executor.py
If the scheduler process dies before the next handle_executor_events cycle flushes that value to the DB, the task_id is lost. The new scheduler's executor starts with an empty workloads dict and event buffer, so there is no way to recover it.
try_adopt_task_instances explicitly skips tasks without a task_id (with a comment acknowledging this limitation):
celery_executor.py
Observed behavior
A task instance in running state owned by a dead scheduler job is reset instead of adopted, triggering a duplicate execution. The scheduler logs show:
with no prior "Setting external_executor_id" log for the task's first try.
Expected behavior
The Celery task_id should survive a scheduler restart. The new scheduler should be able to adopt the in-flight task and poll its Celery state rather than resetting it.
Proposed fix
Persist external_executor_id directly to the database in _send_workloads() immediately after a successful apply_async(), rather than waiting for it to be written as a side effect of event processing:
celery_executor.py — _send_workloads()
This eliminates the race window between task submission and event processing. The event_buffer path can remain as-is for the normal (no-crash) flow; the DB write is an additional safety net.
Notes
Related issues
#37784
Are you willing to submit a PR?
Code of Conduct