refactor(consensus): Cleanup Remaining Task Queue#2533
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
Move sequencer build and get-payload requests to direct Engine methods. Delete the GetPayloadTask wrapper and remove queued Build/GetPayload task variants while keeping insert and consolidation on the existing queue. Co-authored-by: Codex <noreply@openai.com>
5cb8da3 to
7e0d354
Compare
Forward direct build failures to callers, avoid inline temporary-error retry loops, and harden direct get-payload metrics and payload-version handling. Co-authored-by: Codex <noreply@openai.com>
Merge identical temporary-severity match arms in SealTaskError. Co-authored-by: Codex <noreply@openai.com>
Avoid unchanged direct payload state broadcasts and route direct get-payload failures through engine severity handling. Co-authored-by: Codex <noreply@openai.com>
Move unsafe payload insertion into direct Engine methods and delete the InsertTask wrapper. Keep consolidation, delegated forkchoice, finalization, and seal on the existing queue. Co-authored-by: Codex <noreply@openai.com>
Apply nightly rustfmt import grouping and collapse the insert acknowledgement send check for clippy. Co-authored-by: Codex <noreply@openai.com>
Propagate local no-ack insert errors through the existing engine task severity handler. Avoid redundant V3/V4 insert payload clones and clarify the newPayload duration log field. Co-authored-by: Codex <noreply@openai.com>
Reshape the unsafe-payload processor rstest table around a case struct so the generated test does not trip clippy's argument limit. Correct the service README's description of when the local insert acknowledgement is sent versus when the sequencer client waits on the unsafe-head watch channel. Co-authored-by: Codex <noreply@openai.com>
Move safe-head consolidation onto direct Engine methods and route ProcessSafeL2SignalRequest through them. Delete the ConsolidateTask wrapper and remove unused queued seal/consolidate variants so the remaining task queue only covers delegated forkchoice and finalize. Co-authored-by: Codex <noreply@openai.com>
Move delegated forkchoice and finalized-head updates onto direct Engine methods. Route the engine processor through those methods and remove the old task wrappers and queue variants. Merge duplicated finalize test match arms, preserve stale finalized-head no-op behavior in the direct path, and keep derived docs/manifest updates from the rebased base branch. Co-authored-by: Codex <noreply@openai.com>
0519c51 to
134b7c4
Compare
Co-authored-by: Codex <noreply@openai.com>
| }) | ||
| }) | ||
| .await | ||
| } |
There was a problem hiding this comment.
insert_payload_with_retry_inner clones envelope on every Temporary retry via envelope.clone() inside the closure at line 287. Since BaseExecutionPayloadEnvelope contains a full transaction list, this means an allocation-heavy clone on each retry iteration even though the envelope is never mutated between attempts.
Consider restructuring so the closure captures a reference and only clones for the insert_payload_with_state call's ownership requirement — or better, have insert_payload_with_state take the envelope by reference (it already destructures into &execution_payload at line 366 for block construction, and the RPC call at lines 395-413 consumes the owned value but could potentially take a reference + clone internally only for the RPC serialization).
This also applies to consolidate (line 534, input.clone()) and delegated_forkchoice (line 771, update is Copy so no issue there).
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
retry_with_severity only publishes state (line 323) on Ok. On non-temporary errors (Reset, Flush, Critical), the state is NOT published even though the _with_state operation may have partially mutated self.state before failing. For example, delegated_forkchoice_with_state calls consolidate_with_state first — if consolidation succeeds (mutating state via SynchronizeTask::execute) but subsequent finalize_with_state fails with a Critical error, the consolidation state change is applied to self.state but never published to watch-channel subscribers.
In practice this may be safe if the caller (e.g. handle_engine_task_error) triggers a reset that publishes anyway, but it creates a window where self.state and the published state diverge. Consider publishing state on all non-temporary exits as well:
Err(err) => {
let severity = err.severity();
Metrics::engine_task_failure(label, severity.as_label()).increment(1);
// Publish any partial state changes so subscribers stay consistent.
self.state_sender.send_replace(self.state);
match severity { ... }
}| match severity { | ||
| EngineTaskErrorSeverity::Temporary => { | ||
| trace!(target: "engine", error = %err, "Temporary engine error"); | ||
| yield_now().await; |
There was a problem hiding this comment.
The yield_now().await on Temporary errors creates an unbounded retry loop with no backoff or retry limit. If the EL returns Temporary errors persistently (e.g. reth overloaded, sustained network partition), this becomes a busy-wait that saturates CPU and floods the EL with requests — yield_now only yields to other ready tasks on the same tokio worker, not a meaningful delay.
Consider adding at minimum a small tokio::time::sleep (e.g. 10-100ms) instead of yield_now, or an exponential backoff with a cap. A bounded retry count with escalation to a higher severity would also prevent infinite loops:
EngineTaskErrorSeverity::Temporary => {
trace!(target: "engine", error = %err, "Temporary engine error");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}| ); | ||
|
|
||
| Ok(new_block_ref) | ||
| } |
There was a problem hiding this comment.
insert_payload_with_state constructs the BaseBlock from a clone of the payload (lines 366-384) solely to derive L2BlockInfo (line 387) and check is_unsafe_payload_applicable (line 390). The full block — including all deserialized transactions — is never used after that. This is expensive for payloads with large transaction lists.
If L2BlockInfo can be derived from the payload's header fields directly (block hash, number, parent hash, timestamp) plus the genesis config, consider computing it without the full block conversion. This would avoid cloning and deserializing the entire transaction list just to read header metadata. If the from_block_and_genesis API requires a full block, a lighter "from_payload_header_and_genesis" variant could be worth adding.
| return Ok(()); | ||
| } | ||
|
|
||
| Self::finalize_with_state(state, client, config, finalized_target).await?; |
There was a problem hiding this comment.
delegated_forkchoice_with_state is called inside retry_with_severity (line 767). If consolidate_with_state succeeds but finalize_with_state fails with a Temporary error, the entire operation is retried — including the already-successful consolidation. On retry, consolidate_with_state will re-fetch the block from reth and re-issue a FCU, which is wasted work.
This is pre-existing behavior from the old DelegatedForkchoiceTask, but now that both operations live in the same retry loop, consider short-circuiting the consolidation on retry when the safe head already matches update.safe_l2. The consolidate_with_state call does have some built-in idempotency (the FCU is a no-op if state matches), but it still fetches the block from reth on every attempt.
Review SummaryThis PR moves insert, consolidate, delegated forkchoice, and finalize operations from the task-queue onto direct FindingsUnbounded tight retry loop ( State not published on error exits ( Per-retry envelope cloning ( Unnecessary full block construction for applicability check ( Redundant consolidation on delegated forkchoice retry ( |
🟡 Heimdall Review Status
|
Summary
This moves unsafe payload insertion out of the engine task queue and onto direct Engine methods. The remaining queue now only owns seal, consolidate, delegated forkchoice, and finalize work, while sequencer insert acknowledgement still waits for the inserted unsafe head. External unsafe inserts keep the existing retry and reset/flush handling through the processor's task-error path.
Also moves safe-head consolidation out of the engine task queue and onto direct Engine methods. ProcessSafeL2SignalRequest now executes consolidation directly, while delegated forkchoice reuses the same direct consolidation helper before finalizing. The old ConsolidateTask wrapper and unused queued seal/consolidate variants are deleted so the remaining queue only covers delegated forkchoice and finalize.
Move delegated forkchoice and finalized-head updates onto direct Engine methods. The engine processor now executes those paths directly and maps errors through the existing engine task severity handling. The old task wrappers remain as compatibility shells so the queue machinery can be deleted in the next stage without mixing behavior changes into this PR.