Skip to content

Commit 43c162e

Browse files
committed
feat(collator): mempool to slasher bridge
1 parent fef5e5f commit 43c162e

13 files changed

Lines changed: 192 additions & 11 deletions

File tree

collator/src/collator/anchors_cache.rs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::collections::VecDeque;
22
use std::sync::Arc;
33

44
use tycho_network::PeerId;
5+
use tycho_slasher_traits::{AnchorPeerStats, AnchorStats, AnchorStatsRange, CollatedAnchorStats};
6+
use tycho_util::FastHashMap;
57

68
use crate::collator::messages_reader::state::ext::ExternalKey;
79
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
@@ -15,6 +17,7 @@ pub struct AnchorInfo {
1517
#[allow(dead_code)]
1618
pub our_exts_count: usize,
1719
pub author: PeerId,
20+
pub stats: AnchorStats,
1821
}
1922

2023
impl AnchorInfo {
@@ -25,6 +28,7 @@ impl AnchorInfo {
2528
all_exts_count: anchor.externals.len(),
2629
our_exts_count,
2730
author: anchor.author,
31+
stats: anchor.stats.clone(),
2832
}
2933
}
3034
}
@@ -311,6 +315,62 @@ impl<'a> AnchorsCacheTransaction<'a> {
311315
self.cache.last_imported_anchor_info()
312316
}
313317

318+
pub fn collect_imported_anchor_stats(
319+
&self,
320+
prev_imported_chain_time: u64,
321+
next_chain_time: u64,
322+
) -> Option<CollatedAnchorStats> {
323+
fn merge_anchor_stats(acc: &mut FastHashMap<u16, AnchorPeerStats>, stats: &AnchorStats) {
324+
for (validator_idx, peer_stats) in stats.0.iter() {
325+
acc.entry(*validator_idx)
326+
.and_modify(|exists| {
327+
exists.points_proven =
328+
(exists.points_proven).saturating_add(peer_stats.points_proven);
329+
})
330+
.or_insert(AnchorPeerStats {
331+
points_proven: peer_stats.points_proven,
332+
});
333+
}
334+
}
335+
336+
if prev_imported_chain_time >= next_chain_time {
337+
return None;
338+
}
339+
340+
let mut prev_top_anchor = None;
341+
let mut top_anchor = None;
342+
let mut acc = None;
343+
344+
for info in &self.cache.imported_anchors_info_history {
345+
if info.ct <= prev_imported_chain_time {
346+
prev_top_anchor = Some(info.id);
347+
continue;
348+
}
349+
if info.ct > next_chain_time {
350+
break;
351+
}
352+
353+
top_anchor = Some(info.id);
354+
355+
match &mut acc {
356+
None => acc = Some(info.stats.0.as_ref().clone()),
357+
Some(acc) => merge_anchor_stats(acc, &info.stats),
358+
};
359+
}
360+
361+
if prev_top_anchor >= top_anchor {
362+
return None;
363+
}
364+
365+
Some(CollatedAnchorStats {
366+
range: AnchorStatsRange {
367+
prev_top_anchor: prev_top_anchor?,
368+
top_anchor: top_anchor?,
369+
},
370+
stats: AnchorStats(Arc::new(acc?)),
371+
})
372+
}
373+
314374
pub fn get(&self, index: usize) -> Option<(MempoolAnchorId, Arc<MempoolAnchor>)> {
315375
self.cache.get(index)
316376
}
@@ -334,16 +394,36 @@ impl Drop for AnchorsCacheTransaction<'_> {
334394

335395
#[cfg(test)]
336396
mod tests {
397+
use tycho_slasher_traits::AnchorPeerStats;
398+
337399
use super::*;
338400

339401
fn make_anchor(id: MempoolAnchorId, chain_time: u64) -> Arc<MempoolAnchor> {
402+
make_anchor_with_stats(id, chain_time, &[])
403+
}
404+
405+
fn make_anchor_with_stats(
406+
id: MempoolAnchorId,
407+
chain_time: u64,
408+
validator_idx_stats: &[(u16, u16)],
409+
) -> Arc<MempoolAnchor> {
340410
Arc::new(MempoolAnchor {
341411
id,
342412
prev_id: if id > 0 { Some(id - 1) } else { None },
343413
chain_time,
344414
author: PeerId([0; 32]),
345415
externals: Default::default(),
346-
stats: Default::default(),
416+
stats: AnchorStats(Arc::new(
417+
validator_idx_stats
418+
.iter()
419+
.map(|(validator_idx, points_proven)| {
420+
let stats = AnchorPeerStats {
421+
points_proven: *points_proven,
422+
};
423+
(*validator_idx, stats)
424+
})
425+
.collect(),
426+
)),
347427
})
348428
}
349429

@@ -366,6 +446,28 @@ mod tests {
366446
assert_eq!(cache.get(0).unwrap().0, 2);
367447
}
368448

449+
#[test]
450+
fn test_collect_imported_anchor_stats_uses_trimmed_interval() {
451+
let mut cache = AnchorsCache::default();
452+
cache.add(make_anchor_with_stats(10, 100, &[(1, 1)]), 0);
453+
cache.add(make_anchor_with_stats(11, 200, &[(1, 2), (2, 3)]), 0);
454+
cache.add(make_anchor_with_stats(12, 300, &[(1, 4)]), 0);
455+
cache.add(make_anchor_with_stats(13, 400, &[(2, 5)]), 0);
456+
457+
let mut tx = AnchorsCacheTransaction::new(&mut cache);
458+
tx.remove_last_imported_above(320);
459+
460+
let collated_stats = tx.collect_imported_anchor_stats(100, 320).unwrap();
461+
462+
assert_eq!(collated_stats.range, AnchorStatsRange {
463+
prev_top_anchor: 10,
464+
top_anchor: 12,
465+
});
466+
assert_eq!(collated_stats.stats.0.len(), 2);
467+
assert_eq!(collated_stats.stats.0.get(&1).unwrap().points_proven, 6);
468+
assert_eq!(collated_stats.stats.0.get(&2).unwrap().points_proven, 3);
469+
}
470+
369471
#[test]
370472
fn test_pop_front_rollback() {
371473
let mut cache = AnchorsCache::default();

collator/src/collator/do_collate/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tycho_block_util::queue::{QueueDiffStuff, QueueKey, SerializedQueueDiff};
1111
use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
1212
use tycho_core::global_config::ZerostateId;
1313
use tycho_core::storage::{NewBlockMeta, StoreStateHint};
14+
use tycho_slasher_traits::CollatedAnchorStats;
1415
use tycho_types::models::*;
1516
use tycho_types::num::Tokens;
1617
use tycho_types::prelude::*;
@@ -71,6 +72,7 @@ pub struct FinalizeCollationCtx {
7172
/// Do we have unprocessed messages in internals
7273
/// or externals queue after block collation
7374
pub has_unprocessed_messages: bool,
75+
pub collated_anchor_stats: Option<CollatedAnchorStats>,
7476
pub finalized: FinalizeBlockResult,
7577
pub reader_state: ReaderState,
7678
pub ref_mc_state_handle: RefMcStateHandle,
@@ -103,6 +105,7 @@ impl CollatorStdImpl {
103105
next_block_id_short,
104106
mc_data,
105107
collation_config,
108+
prev_imported_chain_time,
106109
wu_used_from_last_anchor,
107110
resume_collation_elapsed,
108111
prev_shard_data,
@@ -171,6 +174,7 @@ impl CollatorStdImpl {
171174
collation_data,
172175
mc_data,
173176
prev_shard_data,
177+
prev_imported_chain_time,
174178
shard_id: self.shard_id,
175179
collation_is_cancelled: CancellationFlag::new(),
176180
is_first_block_after_prev_master,
@@ -331,6 +335,7 @@ impl CollatorStdImpl {
331335
finalized,
332336
execute_result,
333337
final_result,
338+
collated_anchor_stats,
334339
} = match do_collate_res {
335340
Err(CollatorError::Cancelled(reason)) => {
336341
// cancel collation
@@ -373,6 +378,7 @@ impl CollatorStdImpl {
373378
} = self
374379
.finalize_collation(FinalizeCollationCtx {
375380
has_unprocessed_messages: final_result.has_unprocessed_messages,
381+
collated_anchor_stats,
376382
finalized,
377383
reader_state,
378384
ref_mc_state_handle,
@@ -483,6 +489,25 @@ impl CollatorStdImpl {
483489
// from shard and should use anchors only up to chosen next chain time.
484490
anchors_cache.remove_last_imported_above(next_chain_time);
485491

492+
let collated_anchor_stats = anchors_cache
493+
.collect_imported_anchor_stats(state.prev_imported_chain_time, next_chain_time);
494+
if let Some(collated_anchor_stats) = &collated_anchor_stats {
495+
tracing::debug!(target: tracing_targets::COLLATOR,
496+
prev_imported_chain_time = state.prev_imported_chain_time,
497+
next_chain_time,
498+
prev_top_anchor = collated_anchor_stats.range.prev_top_anchor,
499+
top_anchor = collated_anchor_stats.range.top_anchor,
500+
validators = collated_anchor_stats.stats.0.len(),
501+
"aggregated imported anchor stats for block",
502+
);
503+
} else if state.prev_imported_chain_time < next_chain_time {
504+
tracing::warn!(target: tracing_targets::COLLATOR,
505+
prev_imported_chain_time = state.prev_imported_chain_time,
506+
next_chain_time,
507+
"unable to aggregate imported anchor stats for block",
508+
);
509+
}
510+
486511
// Verify that anchor author matches created_by set before collation
487512
let Some(anchor_info) = anchors_cache.last_imported_anchor_info() else {
488513
return Err(CollatorError::Anyhow(anyhow::anyhow!(
@@ -691,6 +716,7 @@ impl CollatorStdImpl {
691716
finalized,
692717
execute_result,
693718
final_result,
719+
collated_anchor_stats,
694720
},
695721
prepared_queue_diff.tx,
696722
))
@@ -1128,6 +1154,7 @@ impl CollatorStdImpl {
11281154

11291155
let FinalizeCollationCtx {
11301156
has_unprocessed_messages,
1157+
collated_anchor_stats,
11311158
finalized,
11321159
reader_state,
11331160
ref_mc_state_handle,
@@ -1214,6 +1241,7 @@ impl CollatorStdImpl {
12141241
.on_block_candidate(BlockCollationResult {
12151242
collation_session_id: self.collation_session.id(),
12161243
candidate: finalized.block_candidate,
1244+
collated_anchor_stats,
12171245
prev_mc_block_id: finalized.old_mc_data.block_id,
12181246
mc_data: finalized.mc_data.clone(),
12191247
collation_config: collation_config.clone(),

collator/src/collator/do_collate/phase.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub struct ActualState {
2020
pub collation_data: Box<BlockCollationData>,
2121
pub mc_data: Arc<McData>,
2222
pub prev_shard_data: PrevData,
23+
pub prev_imported_chain_time: u64,
2324
pub shard_id: ShardIdent,
2425
/// For graceful collation cancellation
2526
pub collation_is_cancelled: CancellationFlag,

collator/src/collator/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ impl CollatorStdImpl {
538538
// collator will import next anchor for sure
539539
// because processed_to anchor will be equal to last inported
540540
author: PeerId(HashBytes::ZERO.0),
541+
stats: Default::default(),
541542
});
542543
}
543544

@@ -949,6 +950,7 @@ impl CollatorStdImpl {
949950
next_block_id_short,
950951
mc_data: new_mc_data,
951952
collation_config,
953+
prev_imported_chain_time: prev_shard_data.gen_chain_time(),
952954
wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
953955
resume_collation_elapsed,
954956
prev_shard_data: Some(prev_shard_data),
@@ -1051,6 +1053,7 @@ impl CollatorStdImpl {
10511053
Ok(Box::new(WorkingState {
10521054
next_block_id_short,
10531055
mc_data,
1056+
prev_imported_chain_time: prev_shard_data.gen_chain_time(),
10541057
wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
10551058
resume_collation_elapsed: Duration::ZERO,
10561059
reader_state: ReaderState::new(prev_shard_data.processed_upto()),

collator/src/collator/tests/collator_tests.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,10 @@ fn test_get_anchors_processing_info() {
509509
prev_gen_chain_time,
510510
);
511511
assert_eq!(anchors_proc_info.last_imported_in_block_id, prev_block_id);
512+
assert_eq!(
513+
anchors_proc_info.current_shard_last_imported_chain_time,
514+
prev_gen_chain_time,
515+
);
512516

513517
//======
514518
// collated master block 1:968, it used the same shard block 0:17
@@ -563,6 +567,10 @@ fn test_get_anchors_processing_info() {
563567
prev_gen_chain_time,
564568
);
565569
assert_eq!(anchors_proc_info.last_imported_in_block_id, prev_block_id);
570+
assert_eq!(
571+
anchors_proc_info.current_shard_last_imported_chain_time,
572+
prev_gen_chain_time,
573+
);
566574

567575
//======
568576
// collated master block 1:1005, it used the same shard block 0:17
@@ -623,6 +631,10 @@ fn test_get_anchors_processing_info() {
623631
anchors_proc_info.last_imported_in_block_id,
624632
mc_data.block_id,
625633
);
634+
assert_eq!(
635+
anchors_proc_info.current_shard_last_imported_chain_time,
636+
prev_gen_chain_time,
637+
);
626638

627639
//======
628640
// when previous shard state has no processed_to and master has one, take from mc
@@ -651,6 +663,10 @@ fn test_get_anchors_processing_info() {
651663
anchors_proc_info.last_imported_in_block_id,
652664
mc_data.block_id,
653665
);
666+
assert_eq!(
667+
anchors_proc_info.current_shard_last_imported_chain_time,
668+
prev_gen_chain_time,
669+
);
654670

655671
//======
656672
// if previous block seqno is ahead from top shard seqno, info from mc must be ignored
@@ -685,6 +701,10 @@ fn test_get_anchors_processing_info() {
685701
anchors_proc_info.last_imported_in_block_id,
686702
prev_block_id_with_other_seqno,
687703
);
704+
assert_eq!(
705+
anchors_proc_info.current_shard_last_imported_chain_time,
706+
prev_gen_chain_time,
707+
);
688708

689709
//======
690710
// for masterchain info is always taken from previous data
@@ -722,4 +742,8 @@ fn test_get_anchors_processing_info() {
722742
anchors_proc_info.last_imported_in_block_id,
723743
master_prev_block_id,
724744
);
745+
assert_eq!(
746+
anchors_proc_info.current_shard_last_imported_chain_time,
747+
master_prev_gen_chain_time,
748+
);
725749
}

collator/src/collator/types.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tycho_block_util::queue::QueuePartitionIdx;
1212
use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
1313
use tycho_core::global_config::MempoolGlobalConfig;
1414
use tycho_executor::{AccountMeta, PublicLibraryChange, TransactionMeta};
15+
use tycho_slasher_traits::CollatedAnchorStats;
1516
use tycho_types::boc;
1617
use tycho_types::cell::{Cell, CellFamily, HashBytes, Lazy, UsageTree, UsageTreeMode};
1718
use tycho_types::dict::{self, Dict};
@@ -42,6 +43,7 @@ pub(super) struct WorkingState {
4243
pub next_block_id_short: BlockIdShort,
4344
pub mc_data: Arc<McData>,
4445
pub collation_config: Arc<CollationConfig>,
46+
pub prev_imported_chain_time: u64,
4547
pub wu_used_from_last_anchor: u64,
4648
pub resume_collation_elapsed: Duration,
4749
pub prev_shard_data: Option<PrevData>,
@@ -1254,6 +1256,7 @@ pub struct CollationResult {
12541256
pub final_result: FinalResult,
12551257
pub finalized: FinalizeBlockResult,
12561258
pub execute_result: ExecuteResult,
1259+
pub collated_anchor_stats: Option<CollatedAnchorStats>,
12571260
}
12581261

12591262
pub struct FinalResult {

collator/src/manager/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,10 +1100,12 @@ where
11001100
} else {
11011101
let validator = self.validator.clone();
11021102
let validation_session_id = session_info.get_validation_session_id();
1103+
let collated_anchor_stats = collation_result.collated_anchor_stats.clone();
11031104
let dispatcher = self.dispatcher.clone();
11041105
tokio::spawn(async move {
1105-
let validation_result =
1106-
validator.validate(validation_session_id, &block_id).await;
1106+
let validation_result = validator
1107+
.validate(validation_session_id, &block_id, collated_anchor_stats)
1108+
.await;
11071109

11081110
match validation_result {
11091111
Ok(status) => {

0 commit comments

Comments
 (0)