Skip to content

refactor(consensus): Cleanup Remaining Task Queue#2533

Draft
refcell wants to merge 11 commits into
mainfrom
rf/refactor/direct-engine-insert
Draft

refactor(consensus): Cleanup Remaining Task Queue#2533
refcell wants to merge 11 commits into
mainfrom
rf/refactor/direct-engine-insert

Conversation

@refcell
Copy link
Copy Markdown
Contributor

@refcell refcell commented May 5, 2026

Summary

This moves unsafe payload insertion out of the engine task queue and onto direct Engine methods. The remaining queue now only owns seal, consolidate, delegated forkchoice, and finalize work, while sequencer insert acknowledgement still waits for the inserted unsafe head. External unsafe inserts keep the existing retry and reset/flush handling through the processor's task-error path.

Also moves safe-head consolidation out of the engine task queue and onto direct Engine methods. ProcessSafeL2SignalRequest now executes consolidation directly, while delegated forkchoice reuses the same direct consolidation helper before finalizing. The old ConsolidateTask wrapper and unused queued seal/consolidate variants are deleted so the remaining queue only covers delegated forkchoice and finalize.

Move delegated forkchoice and finalized-head updates onto direct Engine methods. The engine processor now executes those paths directly and maps errors through the existing engine task severity handling. The old task wrappers remain as compatibility shells so the queue machinery can be deleted in the next stage without mixing behavior changes into this PR.

@vercel
Copy link
Copy Markdown

vercel Bot commented May 5, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
base Ignored Ignored Preview May 14, 2026 8:44pm

Request Review

Comment thread crates/consensus/engine/src/task_queue/core.rs
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
@refcell refcell added consensus Area: consensus stacked Meta: Stacked PR labels May 5, 2026
@refcell refcell self-assigned this May 5, 2026
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
Comment thread crates/consensus/engine/src/task_queue/core.rs
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
Move sequencer build and get-payload requests to direct Engine methods. Delete the GetPayloadTask wrapper and remove queued Build/GetPayload task variants while keeping insert and consolidation on the existing queue.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell force-pushed the rf/refactor/direct-engine-build-payload branch from 5cb8da3 to 7e0d354 Compare May 14, 2026 16:53
refcell and others added 3 commits May 14, 2026 13:08
Forward direct build failures to callers, avoid inline temporary-error retry loops, and harden direct get-payload metrics and payload-version handling.

Co-authored-by: Codex <noreply@openai.com>
Merge identical temporary-severity match arms in SealTaskError.

Co-authored-by: Codex <noreply@openai.com>
Avoid unchanged direct payload state broadcasts and route direct get-payload failures through engine severity handling.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell changed the title refactor(consensus): Move Insert Payload Off Task Queue refactor(consensus): Move Insert, Consolidate Off Task Queue May 14, 2026
@refcell refcell changed the title refactor(consensus): Move Insert, Consolidate Off Task Queue refactor(consensus): Cleanup Remaining Task Queue May 14, 2026
refcell and others added 6 commits May 14, 2026 15:22
Move unsafe payload insertion into direct Engine methods and delete the InsertTask wrapper. Keep consolidation, delegated forkchoice, finalization, and seal on the existing queue.

Co-authored-by: Codex <noreply@openai.com>
Apply nightly rustfmt import grouping and collapse the insert acknowledgement send check for clippy.

Co-authored-by: Codex <noreply@openai.com>
Propagate local no-ack insert errors through the existing engine task severity handler. Avoid redundant V3/V4 insert payload clones and clarify the newPayload duration log field.

Co-authored-by: Codex <noreply@openai.com>
Reshape the unsafe-payload processor rstest table around a case struct so the generated test does not trip clippy's argument limit. Correct the service README's description of when the local insert acknowledgement is sent versus when the sequencer client waits on the unsafe-head watch channel.

Co-authored-by: Codex <noreply@openai.com>
Move safe-head consolidation onto direct Engine methods and route ProcessSafeL2SignalRequest through them. Delete the ConsolidateTask wrapper and remove unused queued seal/consolidate variants so the remaining task queue only covers delegated forkchoice and finalize.

Co-authored-by: Codex <noreply@openai.com>
Move delegated forkchoice and finalized-head updates onto direct Engine methods. Route the engine processor through those methods and remove the old task wrappers and queue variants.

Merge duplicated finalize test match arms, preserve stale finalized-head no-op behavior in the direct path, and keep derived docs/manifest updates from the rebased base branch.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell force-pushed the rf/refactor/direct-engine-insert branch from 0519c51 to 134b7c4 Compare May 14, 2026 19:33
Comment thread crates/consensus/engine/src/task_queue/core.rs
Comment thread crates/consensus/engine/src/task_queue/core.rs
Co-authored-by: Codex <noreply@openai.com>
})
})
.await
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_payload_with_retry_inner clones envelope on every Temporary retry via envelope.clone() inside the closure at line 287. Since BaseExecutionPayloadEnvelope contains a full transaction list, this means an allocation-heavy clone on each retry iteration even though the envelope is never mutated between attempts.

Consider restructuring so the closure captures a reference and only clones for the insert_payload_with_state call's ownership requirement — or better, have insert_payload_with_state take the envelope by reference (it already destructures into &execution_payload at line 366 for block construction, and the RPC call at lines 395-413 consumes the owned value but could potentially take a reference + clone internally only for the RPC serialization).

This also applies to consolidate (line 534, input.clone()) and delegated_forkchoice (line 771, update is Copy so no issue there).

}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_with_severity only publishes state (line 323) on Ok. On non-temporary errors (Reset, Flush, Critical), the state is NOT published even though the _with_state operation may have partially mutated self.state before failing. For example, delegated_forkchoice_with_state calls consolidate_with_state first — if consolidation succeeds (mutating state via SynchronizeTask::execute) but subsequent finalize_with_state fails with a Critical error, the consolidation state change is applied to self.state but never published to watch-channel subscribers.

In practice this may be safe if the caller (e.g. handle_engine_task_error) triggers a reset that publishes anyway, but it creates a window where self.state and the published state diverge. Consider publishing state on all non-temporary exits as well:

Err(err) => {
    let severity = err.severity();
    Metrics::engine_task_failure(label, severity.as_label()).increment(1);
    // Publish any partial state changes so subscribers stay consistent.
    self.state_sender.send_replace(self.state);
    match severity { ... }
}

match severity {
EngineTaskErrorSeverity::Temporary => {
trace!(target: "engine", error = %err, "Temporary engine error");
yield_now().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The yield_now().await on Temporary errors creates an unbounded retry loop with no backoff or retry limit. If the EL returns Temporary errors persistently (e.g. reth overloaded, sustained network partition), this becomes a busy-wait that saturates CPU and floods the EL with requests — yield_now only yields to other ready tasks on the same tokio worker, not a meaningful delay.

Consider adding at minimum a small tokio::time::sleep (e.g. 10-100ms) instead of yield_now, or an exponential backoff with a cap. A bounded retry count with escalation to a higher severity would also prevent infinite loops:

EngineTaskErrorSeverity::Temporary => {
    trace!(target: "engine", error = %err, "Temporary engine error");
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}

);

Ok(new_block_ref)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_payload_with_state constructs the BaseBlock from a clone of the payload (lines 366-384) solely to derive L2BlockInfo (line 387) and check is_unsafe_payload_applicable (line 390). The full block — including all deserialized transactions — is never used after that. This is expensive for payloads with large transaction lists.

If L2BlockInfo can be derived from the payload's header fields directly (block hash, number, parent hash, timestamp) plus the genesis config, consider computing it without the full block conversion. This would avoid cloning and deserializing the entire transaction list just to read header metadata. If the from_block_and_genesis API requires a full block, a lighter "from_payload_header_and_genesis" variant could be worth adding.

return Ok(());
}

Self::finalize_with_state(state, client, config, finalized_target).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegated_forkchoice_with_state is called inside retry_with_severity (line 767). If consolidate_with_state succeeds but finalize_with_state fails with a Temporary error, the entire operation is retried — including the already-successful consolidation. On retry, consolidate_with_state will re-fetch the block from reth and re-issue a FCU, which is wasted work.

This is pre-existing behavior from the old DelegatedForkchoiceTask, but now that both operations live in the same retry loop, consider short-circuiting the consolidation on retry when the safe head already matches update.safe_l2. The consolidate_with_state call does have some built-in idempotency (the FCU is a no-op if state matches), but it still fetches the block from reth on every attempt.

@github-actions
Copy link
Copy Markdown
Contributor

Review Summary

This PR moves insert, consolidate, delegated forkchoice, and finalize operations from the task-queue onto direct Engine methods, with a well-factored retry_with_severity helper that centralizes the retry/severity-dispatch loop. The deleted task wrappers and simplified queue are a clear improvement. Test coverage is thorough, particularly for the bootstrap role classification and unsafe payload processing paths.

Findings

Unbounded tight retry loop (core.rs:334): retry_with_severity uses yield_now().await on Temporary errors, creating a busy-wait with no backoff or retry cap. Under sustained EL pressure this will saturate CPU and flood the EL with requests. A small sleep or exponential backoff would be safer.

State not published on error exits (core.rs:352): retry_with_severity only publishes state via the watch channel on success. Non-temporary errors can leave partial state mutations (e.g., consolidation succeeds but finalization fails in delegated_forkchoice_with_state) unpublished, creating a divergence window between internal state and what subscribers see.

Per-retry envelope cloning (core.rs:301): insert_payload_with_retry_inner clones the full BaseExecutionPayloadEnvelope (including the transaction list) on every Temporary retry iteration. For payloads with large transaction lists, this is allocation-heavy. Consider having the _with_state method borrow the envelope instead.

Unnecessary full block construction for applicability check (core.rs:463): insert_payload_with_state builds a full BaseBlock (deserializing all transactions) solely to derive L2BlockInfo for the is_unsafe_payload_applicable guard. If L2BlockInfo can be derived from payload header fields directly, this would avoid the cost of full block conversion for payloads that get filtered out.

Redundant consolidation on delegated forkchoice retry (core.rs:808): When finalize_with_state fails with Temporary inside delegated_forkchoice_with_state, the retry re-executes the already-successful consolidation, re-fetching the block from reth and re-issuing a FCU.

Base automatically changed from rf/refactor/direct-engine-build-payload to main May 15, 2026 15:52
@cb-heimdall
Copy link
Copy Markdown
Collaborator

🟡 Heimdall Review Status

Requirement Status More Info
Reviews 🟡 0/1
Denominator calculation
Show calculation
1 if user is bot 0
1 if user is external 0
2 if repo is sensitive 0
From .codeflow.yml 1
Additional review requirements
Show calculation
Max 0
0
From CODEOWNERS 0
Global minimum 0
Max 1
1
1 if commit is unverified 0
Sum 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consensus Area: consensus stacked Meta: Stacked PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants