Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/net/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libssz-merkle.workspace = true
libssz-types.workspace = true

sha2 = "0.10"

[dev-dependencies]
hex.workspace = true

[build-dependencies]
vergen-git2.workspace = true
58 changes: 58 additions & 0 deletions crates/net/p2p/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{env, fs, path::PathBuf};

use vergen_git2::{Emitter, Git2Builder};

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Embed the build's short git SHA (consumed via env!("VERGEN_GIT_SHA")) so
// publish-side gossip diagnostics can report which client build emitted a
// message. Mirrors bin/ethlambda/build.rs.
let git2 = Git2Builder::default().sha(true).build()?;
Emitter::default().add_instructions(&git2)?.emit()?;

// Surface the resolved `snap` crate version so the same diagnostics can
// record which snappy implementation produced the compressed payload. The
// version is not available via any standard Cargo env var, so we read it
// out of the workspace lockfile.
let snap_version = snap_version().unwrap_or_else(|| "unknown".to_string());
println!("cargo:rustc-env=SNAP_VERSION={snap_version}");

Ok(())
}

/// Parse the `snap` package version out of the workspace `Cargo.lock`.
///
/// Walks up from this crate's manifest dir until a `Cargo.lock` is found, then
/// scans for the `snap` package entry. Returns `None` if the lockfile or entry
/// is missing (the caller falls back to `"unknown"`).
fn snap_version() -> Option<String> {
let mut dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").ok()?);
let lockfile = loop {
let candidate = dir.join("Cargo.lock");
if candidate.is_file() {
break candidate;
}
if !dir.pop() {
return None;
}
};
println!("cargo:rerun-if-changed={}", lockfile.display());

let contents = fs::read_to_string(&lockfile).ok()?;
let mut lines = contents.lines();
while let Some(line) = lines.next() {
if line.trim() != "name = \"snap\"" {
continue;
}
// Within the same `[[package]]` block, find the version line.
for next in lines.by_ref() {
let next = next.trim();
if next.starts_with("[[package]]") {
break;
}
if let Some(version) = next.strip_prefix("version = \"") {
return Some(version.trim_end_matches('"').to_string());
}
}
}
None
}
96 changes: 95 additions & 1 deletion crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ethlambda_types::{
};
use libp2p::gossipsub::Event;
use libssz::{SszDecode, SszEncode};
use sha2::{Digest as _, Sha256};
use tracing::{error, info, trace};

use super::{
Expand All @@ -15,7 +16,52 @@ use super::{
attestation_subnet_topic,
},
};
use crate::{P2PServer, metrics};
use crate::{P2PServer, gossip_message_id, metrics};

/// Short git SHA of this build, embedded by `build.rs`. Logged with publish-side
/// gossip diagnostics so a captured message can be traced to the emitting build.
const CLIENT_GIT_SHA: &str = env!("VERGEN_GIT_SHA");

/// Snappy implementation and resolved version (the Rust `snap` crate), embedded
/// by `build.rs`. Logged so cross-client byte comparisons can attribute the
/// compressed output to a specific snappy library.
const SNAPPY_LIB_VERSION: &str = concat!("rust-snap/", env!("SNAP_VERSION"));

/// Pre-publish diagnostics for a gossipsub message, capturing the exact bytes a
/// node is about to put on the wire. Used to debug cross-client snappy/SSZ
/// corruption (e.g. blockblaz/zeam#942): comparing these fields against what a
/// peer logs on receipt pinpoints whether divergence is at the compression,
/// transport, or decode stage.
struct PublishDiagnostics {
/// Lowercase hex SHA256 of the uncompressed SSZ payload.
ssz_sha256: String,
/// Lowercase hex SHA256 of the snappy-compressed payload (the on-wire bytes).
compressed_sha256: String,
/// Length in bytes of the compressed payload.
compressed_len: usize,
/// Whether decompressing our own output round-trips back to the SSZ bytes.
/// `false` signals a local snappy encoder bug before the message ever leaves.
snappy_self_decode_ok: bool,
/// Lowercase hex gossipsub message ID, computed identically to the receive
/// path so it matches the ID peers will assign.
message_id: String,
}

impl PublishDiagnostics {
/// Compute diagnostics for `topic` from the uncompressed `ssz` and its
/// `compressed` (on-wire) form.
fn new(topic: &str, ssz: &[u8], compressed: &[u8]) -> Self {
let snappy_self_decode_ok =
decompress_message(compressed).is_ok_and(|decoded| decoded == ssz);
Self {
ssz_sha256: hex::encode(Sha256::digest(ssz)),
compressed_sha256: hex::encode(Sha256::digest(compressed)),
compressed_len: compressed.len(),
snappy_self_decode_ok,
message_id: hex::encode(gossip_message_id(topic, compressed)),
}
}
}

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand Down Expand Up @@ -154,6 +200,22 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte
.cloned()
.unwrap_or_else(|| attestation_subnet_topic(subnet_id));

let topic_hash = topic.hash();
let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed);
info!(
topic = %topic_hash,
%slot,
validator,
ssz_sha256 = %diagnostics.ssz_sha256,
compressed_sha256 = %diagnostics.compressed_sha256,
compressed_len = diagnostics.compressed_len,
snappy_self_decode_ok = diagnostics.snappy_self_decode_ok,
message_id = %diagnostics.message_id,
git_sha = CLIENT_GIT_SHA,
snappy = SNAPPY_LIB_VERSION,
"Publishing attestation to gossipsub (publish diagnostics)"
);

server.swarm_handle.publish(topic, compressed);
info!(
%slot,
Expand Down Expand Up @@ -182,6 +244,23 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {

metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len());

let topic_hash = server.block_topic.hash();
let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed);
info!(
topic = %topic_hash,
%slot,
proposer,
block_root = %hex::encode(block_root.0),
ssz_sha256 = %diagnostics.ssz_sha256,
compressed_sha256 = %diagnostics.compressed_sha256,
compressed_len = diagnostics.compressed_len,
snappy_self_decode_ok = diagnostics.snappy_self_decode_ok,
message_id = %diagnostics.message_id,
git_sha = CLIENT_GIT_SHA,
snappy = SNAPPY_LIB_VERSION,
"Publishing block to gossipsub (publish diagnostics)"
);

// Publish to gossipsub
server
.swarm_handle
Expand Down Expand Up @@ -210,6 +289,21 @@ pub async fn publish_aggregated_attestation(

metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len());

let topic_hash = server.aggregation_topic.hash();
let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed);
info!(
topic = %topic_hash,
%slot,
ssz_sha256 = %diagnostics.ssz_sha256,
compressed_sha256 = %diagnostics.compressed_sha256,
compressed_len = diagnostics.compressed_len,
snappy_self_decode_ok = diagnostics.snappy_self_decode_ok,
message_id = %diagnostics.message_id,
git_sha = CLIENT_GIT_SHA,
snappy = SNAPPY_LIB_VERSION,
"Publishing aggregated attestation to gossipsub (publish diagnostics)"
);

// Publish to the aggregation topic
server
.swarm_handle
Expand Down
38 changes: 25 additions & 13 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,25 +694,37 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str
}
}

fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId {
/// Compute the gossipsub message ID for a topic and its on-wire (snappy) data.
///
/// The ID is the 20-byte truncated SHA256 of `domain || topic_len_le || topic
/// || payload`, where `payload` is the snappy-decompressed bytes when `data`
/// is valid snappy (domain `0x01000000`) and the raw `data` otherwise (domain
/// `0x00000000`). Shared by the gossipsub `message_id_fn` and the publish-side
/// diagnostics so the logged ID matches the one peers will assign.
pub(crate) fn gossip_message_id(topic: &str, data: &[u8]) -> [u8; 20] {
const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00];
const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00];

let mut hasher = sha2::Sha256::new();
let decompressed = gossipsub::decompress_message(&message.data).ok();

let (domain, data) = match decompressed.as_deref() {
Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data),
None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()),
let decompressed = gossipsub::decompress_message(data).ok();
let (domain, payload) = match decompressed.as_deref() {
Some(payload) => (MESSAGE_DOMAIN_VALID_SNAPPY, payload),
None => (MESSAGE_DOMAIN_INVALID_SNAPPY, data),
};
let topic = message.topic.as_str().as_bytes();
let topic_len = (topic.len() as u64).to_le_bytes();

let mut hasher = sha2::Sha256::new();
hasher.update(domain);
hasher.update(topic_len);
hasher.update(topic);
hasher.update(data);
hasher.update((topic.len() as u64).to_le_bytes());
hasher.update(topic.as_bytes());
hasher.update(payload);
let hash = hasher.finalize();
libp2p::gossipsub::MessageId(hash[..20].to_vec())

let mut id = [0u8; 20];
id.copy_from_slice(&hash[..20]);
id
}

fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId {
libp2p::gossipsub::MessageId(gossip_message_id(message.topic.as_str(), &message.data).to_vec())
}

#[cfg(test)]
Expand Down
Loading