Skip to content

Commit 2b844cc

Browse files
committed
fix(consensus): mempool stats across subset change
because stats need data from previous round
1 parent 43c162e commit 2b844cc

9 files changed

Lines changed: 72 additions & 101 deletions

File tree

collator/src/collator/anchors_cache.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use std::collections::VecDeque;
22
use std::sync::Arc;
33

44
use tycho_network::PeerId;
5-
use tycho_slasher_traits::{AnchorPeerStats, AnchorStats, AnchorStatsRange, CollatedAnchorStats};
5+
use tycho_slasher_traits::{
6+
AnchorPeerStats, AnchorStats, AnchorStatsRange, CollatedAnchorStats, PeerIdInner,
7+
};
68
use tycho_util::FastHashMap;
79

810
use crate::collator::messages_reader::state::ext::ExternalKey;
@@ -320,7 +322,10 @@ impl<'a> AnchorsCacheTransaction<'a> {
320322
prev_imported_chain_time: u64,
321323
next_chain_time: u64,
322324
) -> Option<CollatedAnchorStats> {
323-
fn merge_anchor_stats(acc: &mut FastHashMap<u16, AnchorPeerStats>, stats: &AnchorStats) {
325+
fn merge_anchor_stats(
326+
acc: &mut FastHashMap<PeerIdInner, AnchorPeerStats>,
327+
stats: &AnchorStats,
328+
) {
324329
for (validator_idx, peer_stats) in stats.0.iter() {
325330
acc.entry(*validator_idx)
326331
.and_modify(|exists| {
@@ -405,7 +410,7 @@ mod tests {
405410
fn make_anchor_with_stats(
406411
id: MempoolAnchorId,
407412
chain_time: u64,
408-
validator_idx_stats: &[(u16, u16)],
413+
peer_id_stats: &[(PeerIdInner, u16)],
409414
) -> Arc<MempoolAnchor> {
410415
Arc::new(MempoolAnchor {
411416
id,
@@ -414,13 +419,13 @@ mod tests {
414419
author: PeerId([0; 32]),
415420
externals: Default::default(),
416421
stats: AnchorStats(Arc::new(
417-
validator_idx_stats
422+
peer_id_stats
418423
.iter()
419-
.map(|(validator_idx, points_proven)| {
424+
.map(|(peer_id, points_proven)| {
420425
let stats = AnchorPeerStats {
421426
points_proven: *points_proven,
422427
};
423-
(*validator_idx, stats)
428+
(*peer_id, stats)
424429
})
425430
.collect(),
426431
)),
@@ -449,10 +454,13 @@ mod tests {
449454
#[test]
450455
fn test_collect_imported_anchor_stats_uses_trimmed_interval() {
451456
let mut cache = AnchorsCache::default();
452-
cache.add(make_anchor_with_stats(10, 100, &[(1, 1)]), 0);
453-
cache.add(make_anchor_with_stats(11, 200, &[(1, 2), (2, 3)]), 0);
454-
cache.add(make_anchor_with_stats(12, 300, &[(1, 4)]), 0);
455-
cache.add(make_anchor_with_stats(13, 400, &[(2, 5)]), 0);
457+
let p1 = [1; _];
458+
let p2 = [2; _];
459+
460+
cache.add(make_anchor_with_stats(10, 100, &[(p1, 1)]), 0);
461+
cache.add(make_anchor_with_stats(11, 200, &[(p1, 2), (p2, 3)]), 0);
462+
cache.add(make_anchor_with_stats(12, 300, &[(p1, 4)]), 0);
463+
cache.add(make_anchor_with_stats(13, 400, &[(p2, 5)]), 0);
456464

457465
let mut tx = AnchorsCacheTransaction::new(&mut cache);
458466
tx.remove_last_imported_above(320);
@@ -464,8 +472,8 @@ mod tests {
464472
top_anchor: 12,
465473
});
466474
assert_eq!(collated_stats.stats.0.len(), 2);
467-
assert_eq!(collated_stats.stats.0.get(&1).unwrap().points_proven, 6);
468-
assert_eq!(collated_stats.stats.0.get(&2).unwrap().points_proven, 3);
475+
assert_eq!(collated_stats.stats.0.get(&p1).unwrap().points_proven, 6);
476+
assert_eq!(collated_stats.stats.0.get(&p2).unwrap().points_proven, 3);
469477
}
470478

471479
#[test]

collator/src/validator/impls/std_impl/session.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ impl ValidatorSession {
264264
);
265265

266266
// Notify listeners about the own signature
267-
events_scope.receive_signature(self.inner.own_validator_idx, true);
267+
events_scope.receive_signature(self.inner.peer_id.as_bytes(), true);
268268

269269
let mut total_weight = self.inner.own_weight;
270270

@@ -430,7 +430,6 @@ impl ValidatorSession {
430430
};
431431

432432
let validator_info = validators.get(peer_id).expect("peer info out of sync");
433-
let validator_idx = validator_info.validator_idx;
434433

435434
if !validator_info.public_key.verify_raw(&data, &signature) {
436435
tracing::warn!(
@@ -448,14 +447,14 @@ impl ValidatorSession {
448447
metrics::counter!(METRIC_INVALID_SIGNATURES_CACHED_TOTAL).increment(1);
449448

450449
if let Some(scope) = events_scope.upgrade() {
451-
scope.receive_signature(validator_idx, false);
450+
scope.receive_signature(peer_id.as_bytes(), false);
452451
}
453452

454453
break 'stored Default::default();
455454
}
456455

457456
if let Some(scope) = events_scope.upgrade() {
458-
scope.receive_signature(validator_idx, true);
457+
scope.receive_signature(peer_id.as_bytes(), true);
459458
}
460459

461460
total_weight += validator_info.weight;
@@ -708,7 +707,7 @@ impl SessionState {
708707
metrics::counter!(METRIC_INVALID_SIGNATURES_IN_TOTAL).increment(1);
709708

710709
if let Some(scope) = block.events_scope.upgrade() {
711-
scope.receive_signature(validator_info.validator_idx, false);
710+
scope.receive_signature(peer_id.as_bytes(), false);
712711
}
713712

714713
return Err(ValidationError::InvalidSignature);
@@ -749,7 +748,7 @@ impl SessionState {
749748
&& !was_sealed
750749
&& let Some(scope) = block.events_scope.upgrade()
751750
{
752-
scope.receive_signature(validator_info.validator_idx, true);
751+
scope.receive_signature(peer_id.as_bytes(), true);
753752
}
754753

755754
Ok(())

consensus/src/dag/commit/inspector.rs

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tycho_util::FastHashMap;
99
use crate::dag::DagRound;
1010
use crate::effects::{AltFormat, TaskResult};
1111
use crate::engine::MempoolConfig;
12-
use crate::intercom::{PeerSchedule, SubsetPeer};
12+
use crate::intercom::PeerSchedule;
1313
use crate::models::{
1414
CounterU16, DagPoint, Digest, IllFormedPoint, InvalidPoint, MempoolPeerCounters,
1515
MempoolPeerStats, PointId, PointInfo, Round, TransInvalidPoint,
@@ -39,7 +39,7 @@ struct NotReferenced {
3939
}
4040

4141
pub struct RoundInspector {
42-
stats: FastHashMap<SubsetPeer, MempoolPeerStats>,
42+
stats: FastHashMap<PeerId, MempoolPeerStats>,
4343
trans_invalid: Vec<TransInvalidPoint>,
4444
invalid: Vec<InvalidPoint>,
4545
ill_formed: Vec<IllFormedPoint>,
@@ -64,7 +64,7 @@ impl Default for RoundInspector {
6464
}
6565

6666
impl RoundInspector {
67-
pub fn take_stats(&mut self) -> FastHashMap<SubsetPeer, MempoolPeerStats> {
67+
pub fn take_stats(&mut self) -> FastHashMap<PeerId, MempoolPeerStats> {
6868
let capacity = if self.stats.capacity() >= self.stats.len() * 4 {
6969
self.stats.len() * 2
7070
} else {
@@ -107,16 +107,13 @@ impl RoundInspector {
107107
peer_schedule: &PeerSchedule,
108108
conf: &MempoolConfig,
109109
) -> TaskResult<()> {
110-
let (prev_v_idxs, curr_v_idxs, emit_stats) = {
110+
let emit_stats = {
111111
let guard = peer_schedule.atomic();
112-
let prev = guard.validator_idx_map_for(r_0.round().prev()).clone();
113-
let current = guard.validator_idx_map_for(r_0.round()).clone();
114112
// collator is run on another (sub)set than mempool, so slasher stats cannot be merged
115113
let no_stats_range = guard
116114
.next_epoch_start
117115
.map(|r| r - conf.consensus.max_consensus_lag_rounds.get()..r);
118-
let emit_stats = no_stats_range.is_none_or(|r| !r.contains(&r_0.round()));
119-
(prev, current, emit_stats)
116+
no_stats_range.is_none_or(|r| !r.contains(&r_0.round()))
120117
};
121118

122119
let leader_used = r_0
@@ -197,23 +194,18 @@ impl RoundInspector {
197194
}
198195
}
199196

200-
if let Some(&validator_idx) = curr_v_idxs.get(author) {
201-
match (self.stats)
202-
.entry(SubsetPeer {
203-
id: *author,
204-
validator_idx,
205-
})
206-
.or_insert_with(|| MempoolPeerStats::new(r_0.round().0))
207-
.add_in_order(&author_counters)
208-
{
209-
Ok(()) => {}
210-
Err(err) => tracing::error!(
211-
author = display(author.alt()),
212-
error = display(err),
213-
"cannot report peer stats, skipping"
214-
),
215-
};
216-
}
197+
match (self.stats)
198+
.entry(*author)
199+
.or_insert_with(|| MempoolPeerStats::new(r_0.round().0))
200+
.add_in_order(&author_counters)
201+
{
202+
Ok(()) => {}
203+
Err(err) => tracing::error!(
204+
author = display(author.alt()),
205+
error = display(err),
206+
"cannot report peer stats, skipping"
207+
),
208+
};
217209
}
218210

219211
// check references against evidences
@@ -247,13 +239,7 @@ impl RoundInspector {
247239
// will proceed with peers that made two consecutive points, forgive others
248240
continue;
249241
}
250-
let Some(&validator_idx) = prev_v_idxs.get(&signer) else {
251-
continue;
252-
};
253-
let Some(signer_stats) = self.stats.get_mut(&SubsetPeer {
254-
id: signer,
255-
validator_idx,
256-
}) else {
242+
let Some(signer_stats) = self.stats.get_mut(&signer) else {
257243
// this map contains full new v_set, but prev v_set is not guaranteed
258244
continue;
259245
};

consensus/src/dag/commit/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod inspector;
55
use std::ops::RangeInclusive;
66
use std::sync::atomic::Ordering;
77

8+
use tycho_network::PeerId;
89
use tycho_util::FastHashMap;
910

1011
use crate::dag::DagRound;
@@ -13,7 +14,7 @@ use crate::dag::commit::back::DagBack;
1314
use crate::dag::commit::inspector::RoundInspector;
1415
use crate::effects::{AltFmt, AltFormat, Cancelled, TaskResult};
1516
use crate::engine::MempoolConfig;
16-
use crate::intercom::{PeerSchedule, SubsetPeer};
17+
use crate::intercom::PeerSchedule;
1718
use crate::models::{AnchorData, AnchorStageRole, MempoolPeerStats, Round};
1819
use crate::moderator::JournalEvent;
1920

@@ -126,7 +127,7 @@ impl Committer {
126127
last_anchor: Round,
127128
peer_schedule: &PeerSchedule,
128129
conf: &MempoolConfig,
129-
) -> TaskResult<(FastHashMap<SubsetPeer, MempoolPeerStats>, Vec<JournalEvent>)> {
130+
) -> TaskResult<(FastHashMap<PeerId, MempoolPeerStats>, Vec<JournalEvent>)> {
130131
// in case previous anchor was triggered directly - rounds are already dropped
131132
let drained =
132133
(self.dag).drain_upto(last_anchor - conf.consensus.commit_history_rounds.get());

consensus/src/engine/committer_task.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::engine::round_watch::{RoundWatch, TopKnownAnchor};
1818
use crate::engine::{
1919
ConsensusConfigExt, DAG_ROUNDS_TO_DROP, EngineResult, MempoolConfig, NodeConfig,
2020
};
21-
use crate::intercom::{PeerSchedule, SubsetPeer};
21+
use crate::intercom::PeerSchedule;
2222
use crate::models::{
2323
AnchorData, MempoolOutput, MempoolPeerStats, MempoolStatsMergeError, PointInfo, Round,
2424
};
@@ -187,28 +187,19 @@ impl State {
187187
&inner.peer_schedule,
188188
round_ctx.conf(),
189189
)?;
190-
let mut stats_by_idx =
191-
FastHashMap::<u16, AnchorPeerStats>::with_capacity(stat_map.len());
192-
for (subset_peer, stats) in &stat_map {
190+
let mut anchor_stats = FastHashMap::with_capacity(stat_map.len());
191+
for (peer_id, stats) in &stat_map {
193192
let Some(counters) = stats.counters() else {
194193
continue;
195194
};
196-
// zero counters not emitted to keep away `validator_idx` collisions
197-
// in slasher (receiver side) at vset switch boundary
198195
if counters.points_proved.inner() == 0 {
199196
continue;
200197
}
201-
stats_by_idx
202-
.entry(subset_peer.validator_idx)
203-
.and_modify(|exist| {
204-
exist.points_proven = (exist.points_proven)
205-
.saturating_add(counters.points_proved.inner());
206-
})
207-
.or_insert(AnchorPeerStats {
208-
points_proven: counters.points_proved.inner(),
209-
});
198+
anchor_stats.insert(peer_id.0, AnchorPeerStats {
199+
points_proven: counters.points_proved.inner(),
200+
});
210201
}
211-
adata.stats.0 = Arc::new(stats_by_idx);
202+
adata.stats.0 = Arc::new(anchor_stats);
212203
all_stats.push(stat_map);
213204

214205
round_ctx.commit_metrics(&adata.anchor);
@@ -279,14 +270,10 @@ impl RoundCtx {
279270
metrics::counter!("tycho_mempool_stats_merge_errors", "kind" => kind).increment(1);
280271
}
281272

282-
fn meter_stats(&self, all_stats: Vec<FastHashMap<SubsetPeer, MempoolPeerStats>>) {
283-
let acc = match all_stats.first() {
284-
None => return,
285-
Some(m) => FastHashMap::<PeerId, _>::with_capacity(m.len() * 2),
286-
};
287-
let reduced = all_stats.into_iter().fold(acc, |mut acc, other_map| {
273+
fn meter_stats(&self, all_stats: Vec<FastHashMap<PeerId, MempoolPeerStats>>) {
274+
let reduced = all_stats.into_iter().reduce(|mut acc, other_map| {
288275
for (peer, stats) in other_map {
289-
acc.entry(peer.id)
276+
acc.entry(peer)
290277
.and_modify(|occupied| self.merge_stats(occupied, &stats))
291278
.or_insert(stats);
292279
}
@@ -297,6 +284,10 @@ impl RoundCtx {
297284
return; // reduce anyway to catch merge error
298285
}
299286

287+
let Some(reduced) = reduced else {
288+
return;
289+
};
290+
300291
macro_rules! emit_counters {
301292
($prefix:literal, $stats:expr, $labels:expr, [ $($field:ident),* $(,)? ]) => {
302293
$(

consensus/src/intercom/peer_schedule/stateless.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,24 +71,6 @@ impl PeerScheduleStateless {
7171
self.epoch_starts[2]
7272
}
7373

74-
pub fn validator_idx_map_for(&self, round: Round) -> &Arc<FastHashMap<PeerId, u16>> {
75-
let result = if self.next_epoch_start.is_some_and(|r| round >= r) {
76-
&self.validator_idx_maps[3]
77-
} else if round >= self.epoch_starts[2] {
78-
&self.validator_idx_maps[2]
79-
} else if round >= self.epoch_starts[1] {
80-
&self.validator_idx_maps[1]
81-
} else if round >= self.epoch_starts[0] {
82-
&self.validator_idx_maps[0]
83-
} else {
84-
&self.empty_validator_idx_map
85-
};
86-
if result.is_empty() {
87-
tracing::error!("empty validator idx remap for {round:?}: {:?}", self.alt());
88-
}
89-
result
90-
}
91-
9274
pub fn peers_ordered_for(&self, round: Round) -> &Arc<Vec<SubsetPeer>> {
9375
let result = if self.next_epoch_start.is_some_and(|r| round >= r) {
9476
&self.peer_vecs[3]

slasher-traits/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ pub use self::validator::{
66

77
mod mempool;
88
mod validator;
9+
10+
pub type PeerIdInner = [u8; 32];

slasher-traits/src/mempool.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::sync::Arc;
22

33
use tycho_util::FastHashMap;
44

5+
use crate::PeerIdInner;
6+
57
#[derive(Debug, Clone)]
68
pub struct CollatedAnchorStats {
79
pub range: AnchorStatsRange,
@@ -23,7 +25,7 @@ pub struct AnchorStatsRange {
2325
}
2426

2527
#[derive(Debug, Default, Clone)]
26-
pub struct AnchorStats(pub Arc<FastHashMap<u16, AnchorPeerStats>>);
28+
pub struct AnchorStats(pub Arc<FastHashMap<PeerIdInner, AnchorPeerStats>>);
2729

2830
#[derive(Debug, Clone)]
2931
pub struct AnchorPeerStats {

0 commit comments

Comments
 (0)