From 9a7d364bbd77d48cdad879a188824419241e65a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 29 May 2026 14:18:48 -0300 Subject: [PATCH] feat(p2p): log publish-side gossip diagnostics before publish Emit a diagnostic log line on every gossip publish (block, attestation, aggregation) immediately after snappy encoding and before publish(), so a captured on-wire message can be reconciled against what a peer logs on receipt. This is to debug cross-client snappy/SSZ corruption such as blockblaz/zeam#942. Each line carries topic, slot (plus proposer/block_root for blocks), sha256 of the SSZ and of the compressed payload, compressed_len, a snappy self-decode round-trip check (local-encoder canary), the gossipsub message_id, the client git SHA, and the snappy lib/version. The message_id is computed through a shared gossip_message_id(topic, data) that the gossipsub message_id_fn now also delegates to, so the logged id provably matches the one peers assign. git SHA and the resolved `snap` crate version are surfaced at build time via a new p2p build.rs (vergen for the SHA, Cargo.lock parse for the snappy version); hex moves from a dev-dependency to a regular dependency. --- Cargo.lock | 1 + crates/net/p2p/Cargo.toml | 5 +- crates/net/p2p/build.rs | 58 +++++++++++++++ crates/net/p2p/src/gossipsub/handler.rs | 96 ++++++++++++++++++++++++- crates/net/p2p/src/lib.rs | 38 ++++++---- 5 files changed, 182 insertions(+), 16 deletions(-) create mode 100644 crates/net/p2p/build.rs diff --git a/Cargo.lock b/Cargo.lock index 0f7c1d75..70cc210e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2118,6 +2118,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "vergen-git2", ] [[package]] diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index d766b6a8..5663ec93 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -44,6 +44,7 @@ libssz-merkle.workspace = true libssz-types.workspace = true sha2 = "0.10" - -[dev-dependencies] hex.workspace = true + +[build-dependencies] +vergen-git2.workspace = true diff --git a/crates/net/p2p/build.rs b/crates/net/p2p/build.rs new file mode 100644 index 00000000..b0af9402 --- /dev/null +++ b/crates/net/p2p/build.rs @@ -0,0 +1,58 @@ +use std::{env, fs, path::PathBuf}; + +use vergen_git2::{Emitter, Git2Builder}; + +fn main() -> Result<(), Box> { + // Embed the build's short git SHA (consumed via env!("VERGEN_GIT_SHA")) so + // publish-side gossip diagnostics can report which client build emitted a + // message. Mirrors bin/ethlambda/build.rs. + let git2 = Git2Builder::default().sha(true).build()?; + Emitter::default().add_instructions(&git2)?.emit()?; + + // Surface the resolved `snap` crate version so the same diagnostics can + // record which snappy implementation produced the compressed payload. The + // version is not available via any standard Cargo env var, so we read it + // out of the workspace lockfile. + let snap_version = snap_version().unwrap_or_else(|| "unknown".to_string()); + println!("cargo:rustc-env=SNAP_VERSION={snap_version}"); + + Ok(()) +} + +/// Parse the `snap` package version out of the workspace `Cargo.lock`. +/// +/// Walks up from this crate's manifest dir until a `Cargo.lock` is found, then +/// scans for the `snap` package entry. Returns `None` if the lockfile or entry +/// is missing (the caller falls back to `"unknown"`). +fn snap_version() -> Option { + let mut dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").ok()?); + let lockfile = loop { + let candidate = dir.join("Cargo.lock"); + if candidate.is_file() { + break candidate; + } + if !dir.pop() { + return None; + } + }; + println!("cargo:rerun-if-changed={}", lockfile.display()); + + let contents = fs::read_to_string(&lockfile).ok()?; + let mut lines = contents.lines(); + while let Some(line) = lines.next() { + if line.trim() != "name = \"snap\"" { + continue; + } + // Within the same `[[package]]` block, find the version line. + for next in lines.by_ref() { + let next = next.trim(); + if next.starts_with("[[package]]") { + break; + } + if let Some(version) = next.strip_prefix("version = \"") { + return Some(version.trim_end_matches('"').to_string()); + } + } + } + None +} diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index c257006b..fc66e069 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -6,6 +6,7 @@ use ethlambda_types::{ }; use libp2p::gossipsub::Event; use libssz::{SszDecode, SszEncode}; +use sha2::{Digest as _, Sha256}; use tracing::{error, info, trace}; use super::{ @@ -15,7 +16,52 @@ use super::{ attestation_subnet_topic, }, }; -use crate::{P2PServer, metrics}; +use crate::{P2PServer, gossip_message_id, metrics}; + +/// Short git SHA of this build, embedded by `build.rs`. Logged with publish-side +/// gossip diagnostics so a captured message can be traced to the emitting build. +const CLIENT_GIT_SHA: &str = env!("VERGEN_GIT_SHA"); + +/// Snappy implementation and resolved version (the Rust `snap` crate), embedded +/// by `build.rs`. Logged so cross-client byte comparisons can attribute the +/// compressed output to a specific snappy library. +const SNAPPY_LIB_VERSION: &str = concat!("rust-snap/", env!("SNAP_VERSION")); + +/// Pre-publish diagnostics for a gossipsub message, capturing the exact bytes a +/// node is about to put on the wire. Used to debug cross-client snappy/SSZ +/// corruption (e.g. blockblaz/zeam#942): comparing these fields against what a +/// peer logs on receipt pinpoints whether divergence is at the compression, +/// transport, or decode stage. +struct PublishDiagnostics { + /// Lowercase hex SHA256 of the uncompressed SSZ payload. + ssz_sha256: String, + /// Lowercase hex SHA256 of the snappy-compressed payload (the on-wire bytes). + compressed_sha256: String, + /// Length in bytes of the compressed payload. + compressed_len: usize, + /// Whether decompressing our own output round-trips back to the SSZ bytes. + /// `false` signals a local snappy encoder bug before the message ever leaves. + snappy_self_decode_ok: bool, + /// Lowercase hex gossipsub message ID, computed identically to the receive + /// path so it matches the ID peers will assign. + message_id: String, +} + +impl PublishDiagnostics { + /// Compute diagnostics for `topic` from the uncompressed `ssz` and its + /// `compressed` (on-wire) form. + fn new(topic: &str, ssz: &[u8], compressed: &[u8]) -> Self { + let snappy_self_decode_ok = + decompress_message(compressed).is_ok_and(|decoded| decoded == ssz); + Self { + ssz_sha256: hex::encode(Sha256::digest(ssz)), + compressed_sha256: hex::encode(Sha256::digest(compressed)), + compressed_len: compressed.len(), + snappy_self_decode_ok, + message_id: hex::encode(gossip_message_id(topic, compressed)), + } + } +} pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { @@ -154,6 +200,22 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte .cloned() .unwrap_or_else(|| attestation_subnet_topic(subnet_id)); + let topic_hash = topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + validator, + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing attestation to gossipsub (publish diagnostics)" + ); + server.swarm_handle.publish(topic, compressed); info!( %slot, @@ -182,6 +244,23 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + let topic_hash = server.block_topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + proposer, + block_root = %hex::encode(block_root.0), + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing block to gossipsub (publish diagnostics)" + ); + // Publish to gossipsub server .swarm_handle @@ -210,6 +289,21 @@ pub async fn publish_aggregated_attestation( metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + let topic_hash = server.aggregation_topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing aggregated attestation to gossipsub (publish diagnostics)" + ); + // Publish to the aggregation topic server .swarm_handle diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f1fa5fe4..11261ce6 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -694,25 +694,37 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str } } -fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { +/// Compute the gossipsub message ID for a topic and its on-wire (snappy) data. +/// +/// The ID is the 20-byte truncated SHA256 of `domain || topic_len_le || topic +/// || payload`, where `payload` is the snappy-decompressed bytes when `data` +/// is valid snappy (domain `0x01000000`) and the raw `data` otherwise (domain +/// `0x00000000`). Shared by the gossipsub `message_id_fn` and the publish-side +/// diagnostics so the logged ID matches the one peers will assign. +pub(crate) fn gossip_message_id(topic: &str, data: &[u8]) -> [u8; 20] { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; - let mut hasher = sha2::Sha256::new(); - let decompressed = gossipsub::decompress_message(&message.data).ok(); - - let (domain, data) = match decompressed.as_deref() { - Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), - None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), + let decompressed = gossipsub::decompress_message(data).ok(); + let (domain, payload) = match decompressed.as_deref() { + Some(payload) => (MESSAGE_DOMAIN_VALID_SNAPPY, payload), + None => (MESSAGE_DOMAIN_INVALID_SNAPPY, data), }; - let topic = message.topic.as_str().as_bytes(); - let topic_len = (topic.len() as u64).to_le_bytes(); + + let mut hasher = sha2::Sha256::new(); hasher.update(domain); - hasher.update(topic_len); - hasher.update(topic); - hasher.update(data); + hasher.update((topic.len() as u64).to_le_bytes()); + hasher.update(topic.as_bytes()); + hasher.update(payload); let hash = hasher.finalize(); - libp2p::gossipsub::MessageId(hash[..20].to_vec()) + + let mut id = [0u8; 20]; + id.copy_from_slice(&hash[..20]); + id +} + +fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { + libp2p::gossipsub::MessageId(gossip_message_id(message.topic.as_str(), &message.data).to_vec()) } #[cfg(test)]