Skip to content

Persist external_executor_id for CeleryExecutor in _send_workloads() to avoid "lost" events during scheduler termination #64971

@seanmuth

Description

@seanmuth

Description

Generated with claude, wanted to open an issue to have some discussion first on this proposed fix before opening a PR.

When a scheduler dies in the window between sending a task to Celery and processing the resulting QUEUED executor event, external_executor_id is never written to the task_instance table. When a new scheduler takes over and calls adopt_or_reset_orphaned_tasks, CeleryExecutor.try_adopt_task_instances cannot adopt the task (it requires a non-None
external_executor_id to construct the AsyncResult for Celery state lookup), so the task is reset and re-queued instead.

Use case/motivation

Root cause

external_executor_id is only set as a side effect of handle_executor_events when it processes a QUEUED or RUNNING state event:

scheduler_job_runner.py

  if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
      ti.external_executor_id = info

The Celery task_id (result.task_id) is available immediately after apply_async() returns in _send_workloads(), but it is only stored in the in-memory event_buffer:

celery_executor.py

  self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)

If the scheduler process dies before the next handle_executor_events cycle flushes that value to the DB, the task_id is lost. The new scheduler's executor starts with an empty workloads dict and event buffer, so there is no way to recover it.

try_adopt_task_instances explicitly skips tasks without a task_id (with a comment acknowledging this limitation):

celery_executor.py

  for ti in tis:
      if ti.external_executor_id is not None:
          celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti)
      else:
          not_adopted_tis.append(ti)  # cannot adopt — no Celery task_id to look up

Observed behavior

A task instance in running state owned by a dead scheduler job is reset instead of adopted, triggering a duplicate execution. The scheduler logs show:

  Reset the following 1 orphaned TaskInstances:
      <TaskInstance: dag.task scheduled__... [running]>

with no prior "Setting external_executor_id" log for the task's first try.

Expected behavior

The Celery task_id should survive a scheduler restart. The new scheduler should be able to adopt the in-flight task and poll its Celery state rather than resetting it.

Proposed fix

Persist external_executor_id directly to the database in _send_workloads() immediately after a successful apply_async(), rather than waiting for it to be written as a side effect of event processing:

celery_executor.py — _send_workloads()

elif result is not None:
      result.backend = cached_celery_backend
      self.running.add(key)
      self.workloads[key] = result
      self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
      # Persist the Celery task_id immediately so it survives a scheduler restart.
      # Currently this is only written as a side effect of handle_executor_events,
      # which creates a window where a crash leaves external_executor_id as None
      # and prevents adoption by a new scheduler.
      self._persist_external_executor_id(key, result.task_id, session)

This eliminates the race window between task submission and event processing. The event_buffer path can remain as-is for the normal (no-crash) flow; the DB write is an additional safety net.

Notes

  • The existing code comment in try_adopt_task_instances already acknowledges this race condition as a known limitation. This fix resolves it.
  • A session would need to be threaded into _send_workloads or the write issued via a short-lived session, depending on how the call chain is structured.
  • The handle_executor_events path that sets external_executor_id from QUEUED/RUNNING events should be kept as a reconciliation path for any case where the DB write is missed.

Related issues

#37784

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bugkind:featureFeature Requestsneeds-triagelabel for new issues that we didn't triage yetpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions