diff --git a/narwhal/consensus/src/bullshark.rs b/narwhal/consensus/src/bullshark.rs index e30057c781e58..c7928b6f9f48c 100644 --- a/narwhal/consensus/src/bullshark.rs +++ b/narwhal/consensus/src/bullshark.rs @@ -8,11 +8,14 @@ use crate::{ }; use config::{Committee, Stake}; use crypto::PublicKey; +use fastcrypto::hash::Hash; use fastcrypto::traits::EncodeDecodeBase64; use std::{collections::BTreeSet, sync::Arc}; use tokio::time::Instant; use tracing::{debug, trace}; -use types::{Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, Round}; +use types::{ + Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, ReputationScores, Round, +}; #[cfg(test)] #[path = "tests/bullshark_tests.rs"] @@ -48,6 +51,9 @@ pub struct Bullshark { pub last_leader_election: LastRound, /// The most recent round of inserted certificate pub max_inserted_certificate_round: Round, + /// The number of committed subdags that will trigger the schedule change and reputation + /// score reset. + pub num_sub_dags_per_schedule: u64, } impl ConsensusProtocol for Bullshark { @@ -163,10 +169,16 @@ impl ConsensusProtocol for Bullshark { } let next_sub_dag_index = state.latest_sub_dag_index + 1; + + // We update the reputation score stored in state + let reputation_score = + self.update_reputation_score(state, &sequence, next_sub_dag_index); + let sub_dag = CommittedSubDag { certificates: sequence, leader: leader.clone(), sub_dag_index: next_sub_dag_index, + reputation_score, }; // Persist the update. @@ -176,6 +188,7 @@ impl ConsensusProtocol for Bullshark { // Increase the global consensus index. state.latest_sub_dag_index = next_sub_dag_index; + state.last_committed_leader = Some(sub_dag.leader.digest()); committed_sub_dags.push(sub_dag); } @@ -236,6 +249,7 @@ impl Bullshark { store: Arc, gc_depth: Round, metrics: Arc, + num_sub_dags_per_schedule: u64, ) -> Self { Self { committee, @@ -245,6 +259,7 @@ impl Bullshark { last_leader_election: LastRound::default(), max_inserted_certificate_round: 0, metrics, + num_sub_dags_per_schedule, } } @@ -314,4 +329,46 @@ impl Bullshark { // Return its certificate and the certificate's digest. dag.get(&round).and_then(|x| x.get(&leader)) } + + /// Updates and calculates the reputation score for the current commit managing any internal state. + /// It returns the updated reputation score. + fn update_reputation_score( + &mut self, + state: &mut ConsensusState, + committed_sequence: &[Certificate], + sub_dag_index: u64, + ) -> ReputationScores { + // we reset the scores for every schedule change window. + // TODO: when schedule change is implemented we should probably change a little bit + // this logic here. + if sub_dag_index % self.num_sub_dags_per_schedule == 0 { + state.last_consensus_reputation_score = ReputationScores::default() + } + + // update the score for the previous leader. If no previous leader exists, + // then this is the first time we commit a leader, so no score update takes place + if let Some(previous_leader) = state.last_committed_leader { + for certificate in committed_sequence { + // TODO: we could iterate only the certificates of the round above the previous leader's round + if certificate + .header + .parents + .iter() + .any(|digest| *digest == previous_leader) + { + state + .last_consensus_reputation_score + .add_score(certificate.origin(), 1); + } + } + } + + // we check if this is the last subdag of the current schedule. If yes then we mark the + // scores as final_of_schedule = true so any downstream user can now that those are the last + // ones calculated for the current schedule. + state.last_consensus_reputation_score.final_of_schedule = + (sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0; + + state.last_consensus_reputation_score.clone() + } } diff --git a/narwhal/consensus/src/consensus.rs b/narwhal/consensus/src/consensus.rs index 2a62ca0b8deb5..982959e8fd823 100644 --- a/narwhal/consensus/src/consensus.rs +++ b/narwhal/consensus/src/consensus.rs @@ -18,8 +18,8 @@ use storage::CertificateStore; use tokio::{sync::watch, task::JoinHandle}; use tracing::{debug, info, instrument}; use types::{ - metered_channel, Certificate, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver, - ConsensusStore, Round, Timestamp, + metered_channel, Certificate, CertificateDigest, CommittedSubDag, CommittedSubDagShell, + ConditionalBroadcastReceiver, ConsensusStore, ReputationScores, Round, Timestamp, }; #[cfg(test)] @@ -38,6 +38,11 @@ pub struct ConsensusState { pub last_committed: HashMap, /// Used to populate the index in the sub-dag construction. pub latest_sub_dag_index: SequenceNumber, + /// The last calculated consensus reputation score + pub last_consensus_reputation_score: ReputationScores, + /// The last committed sub dag leader. This allow us to calculate the reputation score of the nodes + /// that vote for the last leader. + pub last_committed_leader: Option, /// Keeps the latest committed certificate (and its parents) for every authority. Anything older /// must be regularly cleaned up through the function `update`. pub dag: Dag, @@ -52,6 +57,8 @@ impl ConsensusState { last_committed: Default::default(), latest_sub_dag_index: 0, dag: Default::default(), + last_consensus_reputation_score: ReputationScores::default(), + last_committed_leader: None, metrics, } } @@ -59,7 +66,7 @@ impl ConsensusState { pub fn new_from_store( metrics: Arc, recover_last_committed: HashMap, - latest_sub_dag_index: SequenceNumber, + latest_sub_dag: Option, cert_store: CertificateStore, ) -> Self { let last_committed_round = *recover_last_committed @@ -71,10 +78,17 @@ impl ConsensusState { .expect("error when recovering DAG from store"); metrics.recovered_consensus_state.inc(); + let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) = + latest_sub_dag + .map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader))) + .unwrap_or_default(); + Self { last_committed_round, last_committed: recover_last_committed, + last_consensus_reputation_score, latest_sub_dag_index, + last_committed_leader, dag, metrics, } @@ -246,11 +260,11 @@ where ) -> JoinHandle<()> { // The consensus state (everything else is immutable). let recovered_last_committed = store.read_last_committed(); - let latest_sub_dag_index = store.get_latest_sub_dag_index(); + let latest_sub_dag = store.get_latest_sub_dag(); let state = ConsensusState::new_from_store( metrics.clone(), recovered_last_committed, - latest_sub_dag_index, + latest_sub_dag, cert_store, ); tx_consensus_round_updates diff --git a/narwhal/consensus/src/tests/bullshark_tests.rs b/narwhal/consensus/src/tests/bullshark_tests.rs index 7d91f13fb02a9..847f476c4c3bc 100644 --- a/narwhal/consensus/src/tests/bullshark_tests.rs +++ b/narwhal/consensus/src/tests/bullshark_tests.rs @@ -22,6 +22,8 @@ use types::PreSubscribedBroadcastSender; // the leader of round 2. #[tokio::test] async fn commit_one() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; + let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); // Make certificates for rounds 1 and 2. @@ -53,7 +55,13 @@ async fn commit_one() { let cert_store = make_certificate_store(&test_utils::temp_dir()); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let _consensus_handle = Consensus::spawn( committee, @@ -77,7 +85,7 @@ async fn commit_one() { // Ensure the first 4 ordered certificates are from round 1 (they are the parents of the committed // leader); then the leader's certificate should be committed. - let committed_sub_dag = rx_output.recv().await.unwrap(); + let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap(); let mut sequence = committed_sub_dag.certificates.into_iter(); for _ in 1..=4 { let output = sequence.next().unwrap(); @@ -85,12 +93,17 @@ async fn commit_one() { } let output = sequence.next().unwrap(); assert_eq!(output.round(), 2); + + // AND the reputation scores have not been updated + assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0); } // Run for 8 dag rounds with one dead node node (that is not a leader). We should commit the leaders of // rounds 2, 4, and 6. #[tokio::test] async fn dead_node() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; + // Make the certificates. let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); @@ -118,7 +131,13 @@ async fn dead_node() { let cert_store = make_certificate_store(&test_utils::temp_dir()); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let _consensus_handle = Consensus::spawn( committee, @@ -143,9 +162,11 @@ async fn dead_node() { // We should commit 4 leaders (rounds 2, 4, 6, and 8). let mut committed = Vec::new(); + let mut committed_sub_dags: Vec = Vec::new(); for _commit_rounds in 1..=4 { let committed_sub_dag = rx_output.recv().await.unwrap(); - committed.extend(committed_sub_dag.certificates); + committed.extend(committed_sub_dag.certificates.clone()); + committed_sub_dags.push(committed_sub_dag); } let mut sequence = committed.into_iter(); @@ -156,12 +177,27 @@ async fn dead_node() { } let output = sequence.next().unwrap(); assert_eq!(output.round(), 8); + + // AND check that the consensus scores are the expected ones + for (index, sub_dag) in committed_sub_dags.iter().enumerate() { + // For the first commit we don't expect to have any score updates + if index == 0 { + assert_eq!(sub_dag.reputation_score.total_authorities(), 0); + } else { + // For any other commit we expect to always have a +1 score for each authority, as everyone + // always votes for the leader + for score in sub_dag.reputation_score.scores_per_authority.values() { + assert_eq!(*score as usize, index); + } + } + } } // Run for 5 dag rounds. The leader of round 2 does not have enough support, but the leader of // round 4 does. The leader of rounds 2 and 4 should thus be committed (because they are linked). #[tokio::test] async fn not_enough_support() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); let mut keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); @@ -238,7 +274,13 @@ async fn not_enough_support() { let cert_store = make_certificate_store(&test_utils::temp_dir()); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let _consensus_handle = Consensus::spawn( committee, @@ -261,7 +303,7 @@ async fn not_enough_support() { } // We should commit 2 leaders (rounds 2 and 4). - let committed_sub_dag = rx_output.recv().await.unwrap(); + let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap(); let mut sequence = committed_sub_dag.certificates.into_iter(); for _ in 1..=3 { let output = sequence.next().unwrap(); @@ -270,7 +312,10 @@ async fn not_enough_support() { let output = sequence.next().unwrap(); assert_eq!(output.round(), 2); - let committed_sub_dag = rx_output.recv().await.unwrap(); + // AND no scores exist for leader 2 , as this is the first commit + assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0); + + let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap(); let mut sequence = committed_sub_dag.certificates.into_iter(); for _ in 1..=3 { let output = sequence.next().unwrap(); @@ -282,12 +327,26 @@ async fn not_enough_support() { } let output = sequence.next().unwrap(); assert_eq!(output.round(), 4); + + // AND scores should be updated with everyone that has voted for leader of round 2. + // Only node 0 has voted for the leader of this round, so only 1 score should exist + // with value 1 + assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 1); + + let node_0_name = &keys[0]; + let score = committed_sub_dag + .reputation_score + .scores_per_authority + .get(node_0_name) + .unwrap(); + assert_eq!(*score, 1); } // Run for 7 dag rounds. Node 0 (the leader of round 2) is missing for rounds 1 and 2, // and reappears from round 3. #[tokio::test] async fn missing_leader() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); let mut keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); @@ -328,7 +387,13 @@ async fn missing_leader() { let cert_store = make_certificate_store(&test_utils::temp_dir()); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let _consensus_handle = Consensus::spawn( committee, @@ -351,7 +416,7 @@ async fn missing_leader() { } // Ensure the commit sequence is as expected. - let committed_sub_dag = rx_output.recv().await.unwrap(); + let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap(); let mut sequence = committed_sub_dag.certificates.into_iter(); for _ in 1..=3 { let output = sequence.next().unwrap(); @@ -367,6 +432,9 @@ async fn missing_leader() { } let output = sequence.next().unwrap(); assert_eq!(output.round(), 4); + + // AND no scores exist since this is the first commit that has happened + assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0); } // Run for 11 dag rounds in ideal conditions (all nodes reference all other nodes). @@ -377,6 +445,7 @@ async fn committed_round_after_restart() { let committee = fixture.committee(); let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); let epoch = committee.epoch(); + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; // Make certificates for rounds 1 to 11. let genesis = Certificate::genesis(&committee) @@ -399,7 +468,13 @@ async fn committed_round_after_restart() { let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let handle = Consensus::spawn( committee.clone(), @@ -462,6 +537,8 @@ async fn committed_round_after_restart() { /// from round 2. Certificate 2 should not get committed. #[tokio::test] async fn delayed_certificates_are_rejected() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; + let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); @@ -479,7 +556,13 @@ async fn delayed_certificates_are_rejected() { let store = make_consensus_store(&test_utils::temp_dir()); let mut state = ConsensusState::new(metrics.clone()); - let mut bullshark = Bullshark::new(committee, store, gc_depth, metrics); + let mut bullshark = Bullshark::new( + committee, + store, + gc_depth, + metrics, + NUM_SUB_DAGS_PER_SCHEDULE, + ); // Populate DAG with the rounds up to round 5 so we trigger commits let mut all_subdags = Vec::new(); @@ -506,6 +589,8 @@ async fn delayed_certificates_are_rejected() { #[tokio::test] async fn submitting_equivocating_certificate_should_error() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; + let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); @@ -523,7 +608,13 @@ async fn submitting_equivocating_certificate_should_error() { let store = make_consensus_store(&test_utils::temp_dir()); let mut state = ConsensusState::new(metrics.clone()); - let mut bullshark = Bullshark::new(committee.clone(), store, gc_depth, metrics); + let mut bullshark = Bullshark::new( + committee.clone(), + store, + gc_depth, + metrics, + NUM_SUB_DAGS_PER_SCHEDULE, + ); // Populate DAG with all the certificates for certificate in certificates.clone() { @@ -558,6 +649,78 @@ async fn submitting_equivocating_certificate_should_error() { } } +/// Advance the DAG for 50 rounds, while we change "schedule" for every 5 subdag commits. +#[tokio::test] +async fn reset_consensus_scores_on_every_schedule_change() { + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 5; + + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); + let epoch = committee.epoch(); + let gc_depth = 10; + + // Make certificates for rounds 1 to 50. + let genesis = Certificate::genesis(&committee) + .iter() + .map(|x| x.digest()) + .collect::>(); + let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); + let (certificates, _) = + test_utils::make_certificates_with_epoch(&committee, 1..=50, epoch, &genesis, &keys); + + let store = make_consensus_store(&test_utils::temp_dir()); + let mut state = ConsensusState::new(metrics.clone()); + let mut bullshark = Bullshark::new( + committee, + store, + gc_depth, + metrics, + NUM_SUB_DAGS_PER_SCHEDULE, + ); + + // Populate DAG with the rounds up to round 50 so we trigger commits + let mut all_subdags = Vec::new(); + for certificate in certificates { + let (_, committed_subdags) = bullshark + .process_certificate(&mut state, certificate) + .unwrap(); + all_subdags.extend(committed_subdags); + } + + // ensure the leaders of rounds 2 and 4 have been committed + let mut current_score = 0; + for sub_dag in all_subdags { + // The first commit has no scores + if sub_dag.sub_dag_index == 1 { + assert_eq!(sub_dag.reputation_score.total_authorities(), 0); + } else if sub_dag.sub_dag_index % NUM_SUB_DAGS_PER_SCHEDULE == 0 { + // On every 5th commit we reset the scores and count from the beginning with + // scores updated to 1, as we expect now every node to have voted for the previous leader. + for score in sub_dag.reputation_score.scores_per_authority.values() { + assert_eq!(*score as usize, 1); + } + current_score = 1; + } else { + // On every other commit the scores get calculated incrementally with +1 score + // for every commit. + current_score += 1; + + for score in sub_dag.reputation_score.scores_per_authority.values() { + assert_eq!(*score, current_score); + } + + if (sub_dag.sub_dag_index + 1) % NUM_SUB_DAGS_PER_SCHEDULE == 0 { + // if this is going to be the last score update for the current schedule, then + // make sure that the `fina_of_schedule` will be true + assert!(sub_dag.reputation_score.final_of_schedule); + } else { + assert!(!sub_dag.reputation_score.final_of_schedule); + } + } + } +} + // Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit // the leader of round 2. Then shutdown consensus and restart in a new epoch. #[tokio::test] @@ -565,6 +728,7 @@ async fn restart_with_new_committee() { let fixture = CommitteeFixture::builder().build(); let mut committee = fixture.committee(); let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; // Run for a few epochs. for epoch in 0..5 { @@ -579,7 +743,13 @@ async fn restart_with_new_committee() { let cert_store = make_certificate_store(&test_utils::temp_dir()); let gc_depth = 50; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new(committee.clone(), store.clone(), gc_depth, metrics.clone()); + let bullshark = Bullshark::new( + committee.clone(), + store.clone(), + gc_depth, + metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, + ); let handle = Consensus::spawn( committee.clone(), diff --git a/narwhal/consensus/src/tests/consensus_tests.rs b/narwhal/consensus/src/tests/consensus_tests.rs index 8130356362474..b37f14f3743c8 100644 --- a/narwhal/consensus/src/tests/consensus_tests.rs +++ b/narwhal/consensus/src/tests/consensus_tests.rs @@ -16,7 +16,7 @@ use crate::bullshark::Bullshark; use crate::metrics::ConsensusMetrics; use crate::Consensus; use crate::NUM_SHUTDOWN_RECEIVERS; -use types::{Certificate, PreSubscribedBroadcastSender}; +use types::{Certificate, PreSubscribedBroadcastSender, ReputationScores}; /// This test is trying to compare the output of the Consensus algorithm when: /// (1) running without any crash for certificates processed from round 1 to 5 (inclusive) @@ -37,19 +37,20 @@ async fn test_consensus_recovery_with_bullshark() { let consensus_store = storage.consensus_store; let certificate_store = storage.certificate_store; + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; // AND Setup consensus let fixture = CommitteeFixture::builder().build(); let committee = fixture.committee(); - // AND make certificates for rounds 1 to 5 (inclusive) + // AND make certificates for rounds 1 to 7 (inclusive) let keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect(); let genesis = Certificate::genesis(&committee) .iter() .map(|x| x.digest()) .collect::>(); let (certificates, _next_parents) = - test_utils::make_optimal_certificates(&committee, 1..=5, &genesis, &keys); + test_utils::make_optimal_certificates(&committee, 1..=7, &genesis, &keys); // AND Spawn the consensus engine. let (tx_waiter, rx_waiter) = test_utils::test_channel!(100); @@ -66,6 +67,7 @@ async fn test_consensus_recovery_with_bullshark() { consensus_store.clone(), gc_depth, metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, ); let consensus_handle = Consensus::spawn( @@ -94,25 +96,29 @@ async fn test_consensus_recovery_with_bullshark() { // * 4 certificates from round 1 // * 4 certificates from round 2 // * 4 certificates from round 3 - // * 1 certificate from round 4 (the leader of last round) + // * 4 certificates from round 4 + // * 4 certificates from round 5 + // * 1 certificates from round 6 (the leader of last round) // - // In total we should see 13 certificates committed + // In total we should see 21 certificates committed let mut consensus_index_counter = 1; // hold all the certificates that get committed when consensus runs // without any crash. let mut committed_output_no_crash: Vec = Vec::new(); + let mut score_no_crash: ReputationScores = ReputationScores::default(); 'main: while let Some(sub_dag) = rx_output.recv().await { + score_no_crash = sub_dag.reputation_score.clone(); assert_eq!(sub_dag.sub_dag_index, consensus_index_counter); for output in sub_dag.certificates { - assert!(output.round() <= 4); + assert!(output.round() <= 6); committed_output_no_crash.push(output.clone()); - // we received the leader of round 4, now stop as we don't expect to see any other + // we received the leader of round 6, now stop as we don't expect to see any other // certificate from that or higher round. - if output.round() == 4 { + if output.round() == 6 { break 'main; } } @@ -125,12 +131,12 @@ async fn test_consensus_recovery_with_bullshark() { for key in keys.clone() { let last_round = *last_committed.get(&key).unwrap(); - // For the leader of round 4 we expect to have last committed round of 4. - if key == Bullshark::leader_authority(&committee, 4) { - assert_eq!(last_round, 4); + // For the leader of round 6 we expect to have last committed round of 6. + if key == Bullshark::leader_authority(&committee, 6) { + assert_eq!(last_round, 6); } else { - // For the others should be 3. - assert_eq!(last_round, 3); + // For the others should be 5. + assert_eq!(last_round, 5); } } @@ -155,6 +161,7 @@ async fn test_consensus_recovery_with_bullshark() { consensus_store.clone(), gc_depth, metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, ); let consensus_handle = Consensus::spawn( @@ -171,15 +178,15 @@ async fn test_consensus_recovery_with_bullshark() { ); // WHEN we send same certificates but up to round 3 (inclusive) - // Then we store all the certificates up to round 4 so we can let the recovery algorithm + // Then we store all the certificates up to round 6 so we can let the recovery algorithm // restore the consensus. - // We omit round 5 so we can feed those later after "crash" to trigger a new leader + // We omit round 7 so we can feed those later after "crash" to trigger a new leader // election round and commit. for certificate in certificates.iter() { if certificate.header.round <= 3 { tx_waiter.send(certificate.clone()).await.unwrap(); } - if certificate.header.round <= 4 { + if certificate.header.round <= 6 { certificate_store.write(certificate.clone()).unwrap(); } } @@ -222,6 +229,7 @@ async fn test_consensus_recovery_with_bullshark() { consensus_store.clone(), gc_depth, metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, ); let _consensus_handle = Consensus::spawn( @@ -247,16 +255,18 @@ async fn test_consensus_recovery_with_bullshark() { // AND capture the committed output let mut committed_output_after_crash: Vec = Vec::new(); + let mut score_with_crash: ReputationScores = ReputationScores::default(); 'main: while let Some(sub_dag) = rx_output.recv().await { + score_with_crash = sub_dag.reputation_score.clone(); for output in sub_dag.certificates { assert!(output.round() >= 2); committed_output_after_crash.push(output.clone()); - // we received the leader of round 4, now stop as we don't expect to see any other + // we received the leader of round 6, now stop as we don't expect to see any other // certificate from that or higher round. - if output.round() == 4 { + if output.round() == 6 { break 'main; } } @@ -272,6 +282,18 @@ async fn test_consensus_recovery_with_bullshark() { let all_output_with_crash = committed_output_before_crash; assert_eq!(committed_output_no_crash, all_output_with_crash); + + // AND ensure that scores are exactly the same + assert_eq!(score_with_crash.scores_per_authority.len(), 4); + assert_eq!(score_with_crash, score_no_crash); + assert_eq!( + score_with_crash + .scores_per_authority + .into_iter() + .filter(|(_, score)| *score == 2) + .count(), + 4 + ); } fn setup_tracing() -> TelemetryGuards { diff --git a/narwhal/consensus/src/tusk.rs b/narwhal/consensus/src/tusk.rs index 1c53c38b4276f..31d2856287174 100644 --- a/narwhal/consensus/src/tusk.rs +++ b/narwhal/consensus/src/tusk.rs @@ -9,7 +9,9 @@ use config::{Committee, Stake}; use fastcrypto::{hash::Hash, traits::EncodeDecodeBase64}; use std::{collections::HashMap, sync::Arc}; use tracing::debug; -use types::{Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, Round}; +use types::{ + Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, ReputationScores, Round, +}; #[cfg(any(test))] #[path = "tests/tusk_tests.rs"] @@ -103,6 +105,7 @@ impl ConsensusProtocol for Tusk { certificates: sequence, leader: leader.clone(), sub_dag_index: next_sub_dag_index, + reputation_score: ReputationScores::default(), // TODO compute the scores for Tusk as well }; // Persist the update. diff --git a/narwhal/executor/src/lib.rs b/narwhal/executor/src/lib.rs index d5414f4448148..2e322ecf7be77 100644 --- a/narwhal/executor/src/lib.rs +++ b/narwhal/executor/src/lib.rs @@ -121,6 +121,7 @@ pub async fn get_restored_consensus_output( certificates, leader, sub_dag_index, + reputation_score: compressed_sub_dag.reputation_score, }); } diff --git a/narwhal/executor/tests/consensus_integration_tests.rs b/narwhal/executor/tests/consensus_integration_tests.rs index d6d02e1d91db8..68c014adbae0e 100644 --- a/narwhal/executor/tests/consensus_integration_tests.rs +++ b/narwhal/executor/tests/consensus_integration_tests.rs @@ -16,7 +16,7 @@ use telemetry_subscribers::TelemetryGuards; use test_utils::{cluster::Cluster, temp_dir, CommitteeFixture}; use tokio::sync::watch; -use types::{Certificate, PreSubscribedBroadcastSender, TransactionProto}; +use types::{Certificate, PreSubscribedBroadcastSender, Round, TransactionProto}; #[tokio::test] async fn test_recovery() { @@ -55,13 +55,15 @@ async fn test_recovery() { let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - let gc_depth = 50; + const GC_DEPTH: Round = 50; + const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let bullshark = Bullshark::new( committee.clone(), consensus_store.clone(), - gc_depth, + GC_DEPTH, metrics.clone(), + NUM_SUB_DAGS_PER_SCHEDULE, ); let _consensus_handle = Consensus::spawn( diff --git a/narwhal/node/src/primary_node.rs b/narwhal/node/src/primary_node.rs index d35efddfa6a9c..860751ba7fb79 100644 --- a/narwhal/node/src/primary_node.rs +++ b/narwhal/node/src/primary_node.rs @@ -45,6 +45,10 @@ struct PrimaryNodeInner { impl PrimaryNodeInner { /// The default channel capacity. pub const CHANNEL_CAPACITY: usize = 1_000; + /// The window where the schedule change takes place in consensus. It represents number + /// of committed sub dags. + /// TODO: move this to node properties + const CONSENSUS_SCHEDULE_CHANGE_SUB_DAGS: u64 = 10_000; // Starts the primary node with the provided info. If the node is already running then this // method will return an error instead. @@ -325,6 +329,7 @@ impl PrimaryNodeInner { store.consensus_store.clone(), parameters.gc_depth, consensus_metrics.clone(), + Self::CONSENSUS_SCHEDULE_CHANGE_SUB_DAGS, ); let consensus_handles = Consensus::spawn( committee.clone(), diff --git a/narwhal/types/src/consensus.rs b/narwhal/types/src/consensus.rs index 9828744d195c7..704aca3ac846f 100644 --- a/narwhal/types/src/consensus.rs +++ b/narwhal/types/src/consensus.rs @@ -33,6 +33,8 @@ pub struct CommittedSubDag { pub leader: Certificate, /// The index associated with this CommittedSubDag pub sub_dag_index: SequenceNumber, + /// The so far calculated reputation score for nodes + pub reputation_score: ReputationScores, } impl CommittedSubDag { @@ -63,6 +65,32 @@ impl CommittedSubDag { } } +#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)] +pub struct ReputationScores { + /// Holds the score for every authority. If an authority is not amongst + /// the records of the map then we assume that its score is zero. + pub scores_per_authority: HashMap, + /// When true it notifies us that those scores will be the last updated scores of the + /// current schedule before they get reset for the next schedule and start + /// scoring from the beginning. In practice we can leverage this information to + /// use the scores during the next schedule until the next final ones are calculated. + pub final_of_schedule: bool, +} + +impl ReputationScores { + /// Adds the provided `score` to the existing score for the provided `authority` + pub fn add_score(&mut self, authority: PublicKey, score: u64) { + self.scores_per_authority + .entry(authority) + .and_modify(|value| *value += score) + .or_insert(score); + } + + pub fn total_authorities(&self) -> u64 { + self.scores_per_authority.len() as u64 + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct CommittedSubDagShell { /// The sequence of committed certificates' digests. @@ -71,6 +99,8 @@ pub struct CommittedSubDagShell { pub leader: CertificateDigest, /// Sequence number of the CommittedSubDag pub sub_dag_index: SequenceNumber, + /// The so far calculated reputation score for nodes + pub reputation_score: ReputationScores, } impl CommittedSubDagShell { @@ -79,6 +109,7 @@ impl CommittedSubDagShell { certificates: sub_dag.certificates.iter().map(|x| x.digest()).collect(), leader: sub_dag.leader.digest(), sub_dag_index: sub_dag.sub_dag_index, + reputation_score: sub_dag.reputation_score.clone(), } } } @@ -150,6 +181,16 @@ impl ConsensusStore { s } + /// Returns thet latest subdag committed. If none is committed yet, then + /// None is returned instead. + pub fn get_latest_sub_dag(&self) -> Option { + self.committed_sub_dags_by_index + .iter() + .skip_to_last() + .next() + .map(|(_, subdag)| subdag) + } + /// Load all the sub dags committed with sequence number of at least `from`. pub fn read_committed_sub_dags_from( &self,