Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::state::{State, Validator, anchor_pair_is_consistent};
use libssz::{DecodeError, SszDecode};
use reqwest::Client;
use tracing::{info, warn};

/// Timeout for establishing the HTTP connection to the checkpoint peer.
/// Fail fast if the peer is unreachable.
Expand Down Expand Up @@ -58,6 +59,10 @@ pub enum CheckpointSyncError {
BlockHeaderJustifiedRootMismatch,
#[error("anchor block does not match anchor state")]
AnchorPairingMismatch,
#[error("all checkpoint peers failed; last error: {0}")]
AllPeersFailed(Box<CheckpointSyncError>),
#[error("no checkpoint peers configured")]
NoPeersConfigured,
}

/// Build the HTTP client used for checkpoint sync fetches.
Expand Down Expand Up @@ -269,6 +274,73 @@ fn verify_checkpoint_state(
Ok(())
}

/// Fetch the finalized anchor from a single checkpoint URL, retrying transient
/// races where the peer advances finalization between the state and block
/// fetches.
async fn try_checkpoint_url(
url: &str,
genesis_time: u64,
validators: &[Validator],
) -> Result<(State, SignedBlock), CheckpointSyncError> {
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: Duration = Duration::from_secs(1);

let mut attempt = 1;
loop {
match fetch_finalized_anchor(url, genesis_time, validators).await {
Ok(pair) => return Ok(pair),
Err(CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
%url,
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
}
}

/// Try each checkpoint URL in order, returning the first successful anchor
/// pair. Logs per-peer success/failure. On total failure, returns
/// `AllPeersFailed` wrapping the last peer's error so the underlying cause
/// is preserved in the propagated error (not just the per-peer `warn!` logs).
pub async fn fetch_anchor_from_peers(
checkpoint_urls: &[String],
genesis_time: u64,
validators: &[Validator],
) -> Result<(State, SignedBlock), CheckpointSyncError> {
let mut iter = checkpoint_urls.iter().peekable();
let mut last_err: Option<CheckpointSyncError> = None;
loop {
let Some(url) = iter.next() else {
return Err(match last_err {
Some(err) => CheckpointSyncError::AllPeersFailed(Box::new(err)),
None => CheckpointSyncError::NoPeersConfigured,
});
};
match try_checkpoint_url(url, genesis_time, validators).await {
Ok(pair) => {
info!(%url, "Checkpoint sync successful with this peer");
return Ok(pair);
}
Err(err) => {
if iter.peek().is_some() {
warn!(%url, %err, "Checkpoint sync failed for this peer; trying next URL");
} else {
warn!(%url, %err, "Checkpoint sync failed for this peer; no more URLs to try");
}
last_err = Some(err);
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
93 changes: 42 additions & 51 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052).
/// Base URL(s) of checkpoint-sync peer API servers (e.g., http://peer:5052).
/// When set, skips genesis initialization and fetches the finalized state
/// and block from the peer's `/lean/v0/states/finalized` and
/// and block from each peer's `/lean/v0/states/finalized` and
/// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a
/// URL ending in `/lean/v0/states/finalized` is accepted and the trailing
/// path is stripped.
#[arg(long)]
checkpoint_sync_url: Option<String>,
///
/// Multiple URLs may be supplied for redundancy, either comma-separated
/// (`--checkpoint-sync-url u1,u2`) or by repeating the flag
/// (`--checkpoint-sync-url u1 --checkpoint-sync-url u2`). URLs are tried
/// in order; the first one that succeeds is used and any failures fall
/// over to the next URL. Startup only aborts if every URL fails.
#[arg(long, value_delimiter = ',')]
checkpoint_sync_url: Vec<String>,
/// Whether this node acts as a committee aggregator.
///
/// Seeds the initial value of the live aggregator flag shared by the
Expand Down Expand Up @@ -206,13 +212,16 @@ async fn main() -> eyre::Result<()> {
std::fs::create_dir_all(&data_dir).expect("Failed to create data directory");
let backend = Arc::new(RocksDBBackend::open(&data_dir).expect("Failed to open RocksDB"));

let store = fetch_initial_state(
options.checkpoint_sync_url.as_deref(),
&genesis_config,
backend.clone(),
)
.await
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;
let clean_checkpoint_urls: Vec<String> = options
.checkpoint_sync_url
.into_iter()
.map(|url| url.trim().to_string())
.filter(|url| !url.is_empty())
.collect();

let store = fetch_initial_state(&clean_checkpoint_urls, &genesis_config, backend.clone())
.await
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;

let validator_ids: Vec<u64> = validator_keys.keys().copied().collect();

Expand Down Expand Up @@ -560,10 +569,11 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {

/// Fetch the initial state for the node.
///
/// If `checkpoint_url` is provided, performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block in parallel from a
/// remote peer. Otherwise, creates a genesis state from the local genesis
/// configuration.
/// If `checkpoint_urls` is empty, creates a genesis state from the local
/// genesis configuration. Otherwise performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block from a peer. URLs are
/// tried in order: the first peer that succeeds wins, and failures fall over
/// to the next URL. Startup only aborts if every URL fails.
///
/// Fetching the matching signed block lets the local store serve a valid
/// anchor via the `BlocksByRoot` req-resp protocol; without it, peers
Expand All @@ -572,7 +582,7 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
///
/// # Arguments
///
/// * `checkpoint_url` - Optional base URL to a peer's API server
/// * `checkpoint_urls` - Zero or more base URLs of peer API servers
/// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation)
/// * `backend` - Storage backend for Store creation
///
Expand All @@ -581,51 +591,32 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
/// `Ok(Store)` on success, or `Err(CheckpointSyncError)` if checkpoint sync fails.
/// Genesis path is infallible and always returns `Ok`.
async fn fetch_initial_state(
checkpoint_url: Option<&str>,
checkpoint_urls: &[String],
genesis: &GenesisConfig,
backend: Arc<dyn StorageBackend>,
) -> Result<Store, checkpoint_sync::CheckpointSyncError> {
let validators = genesis.validators();

let Some(checkpoint_url) = checkpoint_url else {
if checkpoint_urls.is_empty() {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
};

// Checkpoint sync path
info!(%checkpoint_url, "Starting checkpoint sync");

// The state and block are fetched in parallel; if the peer advances
// finalization between the two requests the pair won't match. Retry a
// small number of times so this transient race doesn't fail node startup.
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

let mut attempt = 1;
let (state, signed_block) = loop {
match checkpoint_sync::fetch_finalized_anchor(
checkpoint_url,
genesis.genesis_time,
&validators,
)
.await
{
Ok(pair) => break pair,
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
};
// Checkpoint sync path: try URLs in order, fail over to the next on error.
// Log only the count β€” URLs may carry basic-auth credentials or token query
// parameters; per-URL log lines below redact those before emission.
Comment thread
Aliemeka marked this conversation as resolved.
info!(
url_count = checkpoint_urls.len(),
"Starting checkpoint sync"
);

let (state, signed_block) = checkpoint_sync::fetch_anchor_from_peers(
checkpoint_urls,
genesis.genesis_time,
&validators,
)
.await?;

info!(
slot = state.slot,
Expand Down