diff --git a/crates/sui-core/src/authority/epoch_start_configuration.rs b/crates/sui-core/src/authority/epoch_start_configuration.rs index be3de4146943d..1d961c91d7246 100644 --- a/crates/sui-core/src/authority/epoch_start_configuration.rs +++ b/crates/sui-core/src/authority/epoch_start_configuration.rs @@ -5,7 +5,7 @@ use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use sui_types::epoch_data::EpochData; -use sui_types::messages_checkpoint::CheckpointDigest; +use sui_types::messages_checkpoint::{CheckpointDigest, CheckpointTimestamp}; use sui_types::sui_system_state::epoch_start_sui_system_state::{ EpochStartSystemState, EpochStartSystemStateTrait, }; @@ -42,6 +42,10 @@ impl EpochStartConfiguration { self.epoch_digest(), ) } + + pub fn epoch_start_timestamp_ms(&self) -> CheckpointTimestamp { + self.epoch_start_state().epoch_start_timestamp_ms() + } } #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index f5671dc40f3c3..74993d0783e21 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -29,7 +29,6 @@ use sui_types::messages::{ ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind, VerifiedExecutableTransaction, VerifiedTransaction, }; - use sui_types::storage::ParentSync; use tracing::{debug, error, instrument}; @@ -131,8 +130,22 @@ impl ExecutionState for ConsensusHandler { /* (serialized, transaction, output_cert) */ let mut transactions = vec![]; - // Narwhal enforces some invariants on the header.created_at, so we can use it as a timestamp - let timestamp = *consensus_output.sub_dag.leader.header().created_at(); + let timestamp = consensus_output.sub_dag.commit_timestamp(); + let leader_author = consensus_output.sub_dag.leader.header().author(); + + let epoch_start = self + .epoch_store + .epoch_start_config() + .epoch_start_timestamp_ms(); + let timestamp = if timestamp < epoch_start { + error!( + "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}, round {round}", + + ); + epoch_start + } else { + timestamp + }; let prologue_transaction = self.consensus_commit_prologue_transaction(round, timestamp); transactions.push(( @@ -152,12 +165,7 @@ impl ExecutionState for ConsensusHandler { self.metrics .consensus_committed_subdags - .with_label_values(&[&consensus_output - .sub_dag - .leader - .header() - .author() - .to_string()]) + .with_label_values(&[&leader_author.to_string()]) .inc(); for (cert, batches) in consensus_output.batches { let author = cert.header().author(); diff --git a/narwhal/consensus/benches/process_certificates.rs b/narwhal/consensus/benches/process_certificates.rs index 5ad5f20527ff0..93f3ed288207a 100644 --- a/narwhal/consensus/benches/process_certificates.rs +++ b/narwhal/consensus/benches/process_certificates.rs @@ -14,6 +14,7 @@ use narwhal_consensus as consensus; use pprof::criterion::{Output, PProfProfiler}; use prometheus::Registry; use std::{collections::BTreeSet, sync::Arc}; +use storage::NodeStorage; use test_utils::{make_consensus_store, make_optimal_certificates, temp_dir, CommitteeFixture}; use tokio::time::Instant; use types::{Certificate, Round}; @@ -41,10 +42,10 @@ pub fn process_certificates(c: &mut Criterion) { make_optimal_certificates(&committee, 1..=rounds, &genesis, &keys); let store_path = temp_dir(); - let store = make_consensus_store(&store_path); + let store = NodeStorage::reopen(&store_path, None); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth); + let mut state = ConsensusState::new(metrics.clone(), gc_depth); let data_size: usize = certificates .iter() @@ -54,7 +55,7 @@ pub fn process_certificates(c: &mut Criterion) { let mut ordering_engine = Bullshark { committee: committee.clone(), - store, + store: store.consensus_store, metrics, last_successful_leader_election_timestamp: Instant::now(), last_leader_election: Default::default(), diff --git a/narwhal/consensus/src/bullshark.rs b/narwhal/consensus/src/bullshark.rs index 6f328cc0ac29d..b27e63cc0a9be 100644 --- a/narwhal/consensus/src/bullshark.rs +++ b/narwhal/consensus/src/bullshark.rs @@ -9,11 +9,12 @@ use crate::{ use config::{AuthorityIdentifier, Committee, Stake}; use fastcrypto::hash::Hash; use std::sync::Arc; +use storage::ConsensusStore; use tokio::time::Instant; use tracing::{debug, error_span}; use types::{ - Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI, - ReputationScores, Round, + Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, HeaderAPI, ReputationScores, + Round, }; #[cfg(test)] @@ -155,7 +156,7 @@ impl ConsensusProtocol for Bullshark { .iter() .rev() { - let sub_dag_index = state.latest_sub_dag_index + 1; + let sub_dag_index = state.next_sub_dag_index(); let _span = error_span!("bullshark_process_sub_dag", sub_dag_index); debug!("Leader {:?} has enough support", leader); @@ -178,23 +179,23 @@ impl ConsensusProtocol for Bullshark { total_committed_certificates += sequence.len(); - // We update the reputation score stored in state - let reputation_score = self.update_reputation_score(state, &sequence, sub_dag_index); + // We resolve the reputation score that should be stored alongside with this sub dag. + let reputation_score = self.resolve_reputation_score(state, &sequence, sub_dag_index); - let sub_dag = CommittedSubDag { - certificates: sequence, - leader: leader.clone(), + let sub_dag = CommittedSubDag::new( + sequence, + leader.clone(), sub_dag_index, reputation_score, - }; + state.last_committed_sub_dag.as_ref(), + ); // Persist the update. self.store .write_consensus_state(&state.last_committed, &sub_dag)?; - // Increase the global consensus index. - state.latest_sub_dag_index = sub_dag_index; - state.last_committed_leader = Some(sub_dag.leader.digest()); + // Update the last sub dag + state.last_committed_sub_dag = Some(sub_dag.clone()); committed_sub_dags.push(sub_dag); } @@ -302,52 +303,59 @@ impl Bullshark { dag.get(&round).and_then(|x| x.get(&leader)) } - /// Updates and calculates the reputation score for the current commit managing any internal state. - /// It returns the updated reputation score. - fn update_reputation_score( - &mut self, + /// Calculates the reputation score for the current commit by taking into account the reputation + /// scores from the previous commit (assuming that exists). It returns the updated reputation score. + fn resolve_reputation_score( + &self, state: &mut ConsensusState, committed_sequence: &[Certificate], sub_dag_index: u64, ) -> ReputationScores { - // we reset the scores for every schedule change window. + // we reset the scores for every schedule change window, or initialise when it's the first + // sub dag we are going to create. // TODO: when schedule change is implemented we should probably change a little bit // this logic here. - if sub_dag_index % self.num_sub_dags_per_schedule == 0 { - state.last_consensus_reputation_score = ReputationScores::new(&self.committee) - } + let mut reputation_score = + if sub_dag_index == 1 || sub_dag_index % self.num_sub_dags_per_schedule == 0 { + ReputationScores::new(&self.committee) + } else { + state + .last_committed_sub_dag + .as_ref() + .expect("Committed sub dag should always exist for sub_dag_index > 1") + .reputation_score + .clone() + }; // update the score for the previous leader. If no previous leader exists, // then this is the first time we commit a leader, so no score update takes place - if let Some(previous_leader) = state.last_committed_leader { + if let Some(last_committed_sub_dag) = state.last_committed_sub_dag.as_ref() { for certificate in committed_sequence { // TODO: we could iterate only the certificates of the round above the previous leader's round if certificate .header() .parents() .iter() - .any(|digest| *digest == previous_leader) + .any(|digest| *digest == last_committed_sub_dag.leader.digest()) { - state - .last_consensus_reputation_score - .add_score(certificate.origin(), 1); + reputation_score.add_score(certificate.origin(), 1); } } } - // we check if this is the last subdag of the current schedule. If yes then we mark the + // we check if this is the last sub dag of the current schedule. If yes then we mark the // scores as final_of_schedule = true so any downstream user can now that those are the last // ones calculated for the current schedule. - state.last_consensus_reputation_score.final_of_schedule = + reputation_score.final_of_schedule = (sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0; // Always ensure that all the authorities are present in the reputation scores - even // when score is zero. assert_eq!( - state.last_consensus_reputation_score.total_authorities() as usize, + reputation_score.total_authorities() as usize, self.committee.size() ); - state.last_consensus_reputation_score.clone() + reputation_score } } diff --git a/narwhal/consensus/src/consensus.rs b/narwhal/consensus/src/consensus.rs index c7db824cd3daf..fefe65a3ae4b6 100644 --- a/narwhal/consensus/src/consensus.rs +++ b/narwhal/consensus/src/consensus.rs @@ -14,13 +14,12 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, sync::Arc, }; -use storage::CertificateStore; +use storage::{CertificateStore, ConsensusStore}; use tokio::{sync::watch, task::JoinHandle}; use tracing::{debug, info, instrument}; use types::{ metered_channel, Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, - CommittedSubDagShell, ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI, - ReputationScores, Round, Timestamp, + ConditionalBroadcastReceiver, ConsensusCommit, HeaderAPI, Round, Timestamp, }; #[cfg(test)] @@ -39,13 +38,8 @@ pub struct ConsensusState { /// Keeps the last committed round for each authority. This map is used to clean up the dag and /// ensure we don't commit twice the same certificate. pub last_committed: HashMap, - /// Used to populate the index in the sub-dag construction. - pub latest_sub_dag_index: SequenceNumber, - /// The last calculated consensus reputation score - pub last_consensus_reputation_score: ReputationScores, - /// The last committed sub dag leader. This allow us to calculate the reputation score of the nodes - /// that vote for the last leader. - pub last_committed_leader: Option, + /// The last committed sub dag. If value is None, it means that we haven't committed any sub dag yet. + pub last_committed_sub_dag: Option, /// Keeps the latest committed certificate (and its parents) for every authority. Anything older /// must be regularly cleaned up through the function `update`. pub dag: Dag, @@ -54,15 +48,13 @@ pub struct ConsensusState { } impl ConsensusState { - pub fn new(metrics: Arc, committee: &Committee, gc_depth: Round) -> Self { + pub fn new(metrics: Arc, gc_depth: Round) -> Self { Self { last_round: ConsensusRound::default(), gc_depth, last_committed: Default::default(), - latest_sub_dag_index: 0, dag: Default::default(), - last_consensus_reputation_score: ReputationScores::new(committee), - last_committed_leader: None, + last_committed_sub_dag: None, metrics, } } @@ -72,32 +64,50 @@ impl ConsensusState { last_committed_round: Round, gc_depth: Round, recovered_last_committed: HashMap, - latest_sub_dag: Option, + latest_sub_dag: Option, cert_store: CertificateStore, - committee: &Committee, ) -> Self { let last_round = ConsensusRound::new_with_gc_depth(last_committed_round, gc_depth); let dag = Self::construct_dag_from_cert_store( - cert_store, + &cert_store, &recovered_last_committed, last_round.gc_round, ) .expect("error when recovering DAG from store"); metrics.recovered_consensus_state.inc(); - let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) = - latest_sub_dag - .map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader))) - .unwrap_or((0, ReputationScores::new(committee), None)); + let last_committed_sub_dag = if let Some(latest_sub_dag) = latest_sub_dag.as_ref() { + let certificates = latest_sub_dag + .certificates() + .iter() + .map(|s| { + cert_store + .read(*s) + .unwrap() + .expect("Certificate should be found in database") + }) + .collect(); + + let leader = cert_store + .read(latest_sub_dag.leader()) + .unwrap() + .expect("Certificate should be found in database"); + + Some(CommittedSubDag::from_commit( + latest_sub_dag.clone(), + certificates, + leader, + )) + } else { + None + }; Self { gc_depth, last_round, last_committed: recovered_last_committed, - last_consensus_reputation_score, - latest_sub_dag_index, - last_committed_leader, + last_committed_sub_dag, dag, metrics, } @@ -105,7 +115,7 @@ impl ConsensusState { #[instrument(level = "info", skip_all)] pub fn construct_dag_from_cert_store( - cert_store: CertificateStore, + cert_store: &CertificateStore, last_committed: &HashMap, gc_round: Round, ) -> Result { @@ -229,6 +239,15 @@ impl ConsensusState { panic!("Parent round not found in DAG for {certificate:?}!"); } } + + /// Provides the next index to be used for the next produced sub dag + pub fn next_sub_dag_index(&self) -> SequenceNumber { + self.last_committed_sub_dag + .as_ref() + .map(|s| s.sub_dag_index) + .unwrap_or_default() + + 1 + } } /// Describe how to sequence input certificates. @@ -336,9 +355,11 @@ where let latest_sub_dag = store.get_latest_sub_dag(); if let Some(sub_dag) = &latest_sub_dag { assert_eq!( - sub_dag.leader_round, last_committed_round, + sub_dag.leader_round(), + last_committed_round, "Last subdag leader round {} is not equal to the last committed round {}!", - sub_dag.leader_round, last_committed_round, + sub_dag.leader_round(), + last_committed_round, ); } @@ -349,7 +370,6 @@ where recovered_last_committed, latest_sub_dag, cert_store, - &committee, ); tx_consensus_round_updates diff --git a/narwhal/consensus/src/tests/bullshark_tests.rs b/narwhal/consensus/src/tests/bullshark_tests.rs index 16168498cdaea..971ebe32f8929 100644 --- a/narwhal/consensus/src/tests/bullshark_tests.rs +++ b/narwhal/consensus/src/tests/bullshark_tests.rs @@ -572,7 +572,7 @@ async fn delayed_certificates_are_rejected() { test_utils::make_certificates_with_epoch(&committee, 1..=5, epoch, &genesis, &ids); let store = make_consensus_store(&test_utils::temp_dir()); - let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth); + let mut state = ConsensusState::new(metrics.clone(), gc_depth); let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); // Populate DAG with the rounds up to round 5 so we trigger commits @@ -618,7 +618,7 @@ async fn submitting_equivocating_certificate_should_error() { test_utils::make_certificates_with_epoch(&committee, 1..=1, epoch, &genesis, &ids); let store = make_consensus_store(&test_utils::temp_dir()); - let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth); + let mut state = ConsensusState::new(metrics.clone(), gc_depth); let mut bullshark = Bullshark::new(committee.clone(), store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); @@ -676,7 +676,7 @@ async fn reset_consensus_scores_on_every_schedule_change() { test_utils::make_certificates_with_epoch(&committee, 1..=50, epoch, &genesis, &ids); let store = make_consensus_store(&test_utils::temp_dir()); - let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth); + let mut state = ConsensusState::new(metrics.clone(), gc_depth); let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); // Populate DAG with the rounds up to round 50 so we trigger commits @@ -853,7 +853,7 @@ async fn garbage_collection_basic() { let store = make_consensus_store(&test_utils::temp_dir()); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH); + let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH); let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); // Now start feeding the certificates per round @@ -953,7 +953,7 @@ async fn slow_node() { // Create Bullshark consensus engine let store = make_consensus_store(&test_utils::temp_dir()); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH); + let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH); let mut bullshark = Bullshark::new(committee.clone(), store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); @@ -1126,7 +1126,7 @@ async fn not_enough_support_and_missing_leaders_and_gc() { // Create Bullshark consensus engine let store = make_consensus_store(&test_utils::temp_dir()); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH); + let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH); let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE); let mut committed = false; diff --git a/narwhal/consensus/src/tests/consensus_utils.rs b/narwhal/consensus/src/tests/consensus_utils.rs index 92aee9916ec09..c21ac2df8e5e3 100644 --- a/narwhal/consensus/src/tests/consensus_utils.rs +++ b/narwhal/consensus/src/tests/consensus_utils.rs @@ -3,11 +3,11 @@ use std::num::NonZeroUsize; // SPDX-License-Identifier: Apache-2.0 use config::AuthorityIdentifier; use std::sync::Arc; -use storage::{CertificateStore, CertificateStoreCache}; +use storage::{CertificateStore, CertificateStoreCache, ConsensusStore}; use store::rocks::MetricConf; use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions}; use types::{ - Certificate, CertificateDigest, CommittedSubDagShell, ConsensusStore, Round, SequenceNumber, + Certificate, CertificateDigest, CommittedSubDagShell, ConsensusCommit, Round, SequenceNumber, }; pub(crate) const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; @@ -15,21 +15,27 @@ pub(crate) const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; pub fn make_consensus_store(store_path: &std::path::Path) -> Arc { const LAST_COMMITTED_CF: &str = "last_committed"; const SEQUENCE_CF: &str = "sequence"; + const COMMITTED_SUB_DAG_CF: &str = "committed_sub_dag"; let rocksdb = rocks::open_cf( store_path, None, MetricConf::default(), - &[LAST_COMMITTED_CF, SEQUENCE_CF], + &[LAST_COMMITTED_CF, SEQUENCE_CF, COMMITTED_SUB_DAG_CF], ) .expect("Failed to create database"); - let (last_committed_map, sequence_map) = reopen!(&rocksdb, + let (last_committed_map, sequence_map, committed_sub_dag_map) = reopen!(&rocksdb, LAST_COMMITTED_CF;, - SEQUENCE_CF; + SEQUENCE_CF;, + COMMITTED_SUB_DAG_CF; ); - Arc::new(ConsensusStore::new(last_committed_map, sequence_map)) + Arc::new(ConsensusStore::new( + last_committed_map, + sequence_map, + committed_sub_dag_map, + )) } pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore { diff --git a/narwhal/consensus/src/tests/randomized_tests.rs b/narwhal/consensus/src/tests/randomized_tests.rs index 596c45c745c09..7e2e84c9b0098 100644 --- a/narwhal/consensus/src/tests/randomized_tests.rs +++ b/narwhal/consensus/src/tests/randomized_tests.rs @@ -423,7 +423,7 @@ fn generate_and_run_execution_plans( // Now create a new Bullshark engine let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth); + let mut state = ConsensusState::new(metrics.clone(), gc_depth); let mut bullshark = Bullshark::new( committee.clone(), store.clone(), diff --git a/narwhal/consensus/src/tusk.rs b/narwhal/consensus/src/tusk.rs index 726fc70bf2e31..68ba76b3e2656 100644 --- a/narwhal/consensus/src/tusk.rs +++ b/narwhal/consensus/src/tusk.rs @@ -8,10 +8,11 @@ use crate::{ use config::{Committee, Stake}; use fastcrypto::hash::Hash; use std::{collections::HashMap, sync::Arc}; +use storage::ConsensusStore; use tracing::{debug, error_span}; use types::{ - Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI, - ReputationScores, Round, + Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, HeaderAPI, ReputationScores, + Round, }; #[cfg(any(test))] @@ -90,7 +91,7 @@ impl ConsensusProtocol for Tusk { .iter() .rev() { - let sub_dag_index = state.latest_sub_dag_index + 1; + let sub_dag_index = state.next_sub_dag_index(); let _span = error_span!("tusk_process_sub_dag", sub_dag_index); let mut sequence = Vec::new(); @@ -105,19 +106,21 @@ impl ConsensusProtocol for Tusk { } debug!("Subdag has {} certificates", sequence.len()); - let sub_dag = CommittedSubDag { - certificates: sequence, - leader: leader.clone(), + // TODO compute the scores for Tusk as well + let sub_dag = CommittedSubDag::new( + sequence, + leader.clone(), sub_dag_index, - reputation_score: ReputationScores::default(), // TODO compute the scores for Tusk as well - }; + ReputationScores::default(), + None, + ); // Persist the update. self.store .write_consensus_state(&state.last_committed, &sub_dag)?; // Increase the global consensus index. - state.latest_sub_dag_index = sub_dag_index; + state.last_committed_sub_dag = Some(sub_dag.clone()); committed_sub_dags.push(sub_dag); } @@ -179,7 +182,8 @@ mod tests { use prometheus::Registry; use rand::Rng; use std::collections::BTreeSet; - use test_utils::{make_consensus_store, CommitteeFixture}; + use storage::NodeStorage; + use test_utils::CommitteeFixture; use types::Certificate; #[tokio::test] @@ -200,12 +204,12 @@ mod tests { test_utils::make_optimal_certificates(&committee, 1..=rounds, &genesis, &keys); let store_path = test_utils::temp_dir(); - let store = make_consensus_store(&store_path); + let store = NodeStorage::reopen(&store_path, None); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics, &committee, gc_depth); - let mut tusk = Tusk::new(committee, store, gc_depth); + let mut state = ConsensusState::new(metrics, gc_depth); + let mut tusk = Tusk::new(committee, store.consensus_store, gc_depth); for certificate in certificates { tusk.process_certificate(&mut state, certificate).unwrap(); } @@ -242,15 +246,19 @@ mod tests { // TODO: evidence that this test fails when `failure_probability` parameter >= 1/3 let (certificates, _next_parents) = test_utils::make_certificates(&committee, 1..=rounds, &genesis, &keys, 0.333); - let arc_committee = Arc::new(ArcSwap::from_pointee(committee.clone())); + let arc_committee = Arc::new(ArcSwap::from_pointee(committee)); let store_path = test_utils::temp_dir(); - let store = make_consensus_store(&store_path); + let store = NodeStorage::reopen(&store_path, None); let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let mut state = ConsensusState::new(metrics, &committee, gc_depth); - let mut tusk = Tusk::new((**arc_committee.load()).clone(), store, gc_depth); + let mut state = ConsensusState::new(metrics, gc_depth); + let mut tusk = Tusk::new( + (**arc_committee.load()).clone(), + store.consensus_store, + gc_depth, + ); for certificate in certificates { tusk.process_certificate(&mut state, certificate).unwrap(); diff --git a/narwhal/executor/src/lib.rs b/narwhal/executor/src/lib.rs index 6149a49e64e64..b4bafb49627b6 100644 --- a/narwhal/executor/src/lib.rs +++ b/narwhal/executor/src/lib.rs @@ -18,12 +18,12 @@ use mockall::automock; use network::client::NetworkClient; use prometheus::Registry; use std::sync::Arc; -use storage::CertificateStore; +use storage::{CertificateStore, ConsensusStore}; use tokio::task::JoinHandle; use tracing::info; use types::{ metered_channel, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver, - ConsensusOutput, ConsensusStore, + ConsensusOutput, }; /// Convenience type representing a serialized transaction. @@ -103,8 +103,7 @@ pub async fn get_restored_consensus_output( let mut sub_dags = Vec::new(); for compressed_sub_dag in compressed_sub_dags { - let sub_dag_index = compressed_sub_dag.sub_dag_index; - let certificate_digests: Vec = compressed_sub_dag.certificates; + let certificate_digests: Vec = compressed_sub_dag.certificates(); let certificates = certificate_store .read_all(certificate_digests)? @@ -112,14 +111,15 @@ pub async fn get_restored_consensus_output( .flatten() .collect(); - let leader = certificate_store.read(compressed_sub_dag.leader)?.unwrap(); + let leader = certificate_store + .read(compressed_sub_dag.leader())? + .unwrap(); - sub_dags.push(CommittedSubDag { + sub_dags.push(CommittedSubDag::from_commit( + compressed_sub_dag, certificates, leader, - sub_dag_index, - reputation_score: compressed_sub_dag.reputation_score, - }); + )); } Ok(sub_dags) diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index d61c528b10927..5f3d44218db49 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -11,13 +11,14 @@ use std::{cmp::Ordering, collections::BTreeMap, iter}; use sui_macros::fail_point; use tap::Tap; +use crate::StoreResult; use config::AuthorityIdentifier; use mysten_common::sync::notify_read::NotifyRead; use store::{ rocks::{DBMap, TypedStoreError::RocksDBError}, Map, }; -use types::{Certificate, CertificateDigest, Round, StoreResult}; +use types::{Certificate, CertificateDigest, Round}; #[derive(Clone)] pub struct CertificateStoreCacheMetrics { diff --git a/narwhal/storage/src/consensus_store.rs b/narwhal/storage/src/consensus_store.rs new file mode 100644 index 0000000000000..2db891684235f --- /dev/null +++ b/narwhal/storage/src/consensus_store.rs @@ -0,0 +1,222 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{NodeStorage, StoreResult}; +use config::AuthorityIdentifier; +use std::collections::HashMap; +use store::rocks::{open_cf, DBMap, MetricConf, ReadWriteOptions}; +use store::{reopen, Map, TypedStoreError}; +use types::{ + CommittedSubDag, CommittedSubDagShell, ConsensusCommit, ConsensusCommitV2, Round, + SequenceNumber, +}; + +/// The persistent storage of the sequencer. +pub struct ConsensusStore { + /// The latest committed round of each validator. + last_committed: DBMap, + /// TODO: remove once released to validators + /// The global consensus sequence. + committed_sub_dags_by_index: DBMap, + /// The global consensus sequence + committed_sub_dags_by_index_v2: DBMap, +} + +impl ConsensusStore { + /// Create a new consensus store structure by using already loaded maps. + pub fn new( + last_committed: DBMap, + sequence: DBMap, + committed_sub_dags_map: DBMap, + ) -> Self { + Self { + last_committed, + committed_sub_dags_by_index: sequence, + committed_sub_dags_by_index_v2: committed_sub_dags_map, + } + } + + pub fn new_for_tests() -> Self { + let rocksdb = open_cf( + tempfile::tempdir().unwrap(), + None, + MetricConf::default(), + &[ + NodeStorage::LAST_COMMITTED_CF, + NodeStorage::SUB_DAG_INDEX_CF, + NodeStorage::COMMITTED_SUB_DAG_INDEX_CF, + ], + ) + .expect("Cannot open database"); + let (last_committed_map, sub_dag_index_map, committed_sub_dag_map) = reopen!(&rocksdb, NodeStorage::LAST_COMMITTED_CF;, NodeStorage::SUB_DAG_INDEX_CF;, NodeStorage::COMMITTED_SUB_DAG_INDEX_CF;); + Self::new(last_committed_map, sub_dag_index_map, committed_sub_dag_map) + } + + /// Clear the store. + pub fn clear(&self) -> StoreResult<()> { + self.last_committed.clear()?; + self.committed_sub_dags_by_index.clear()?; + self.committed_sub_dags_by_index_v2.clear()?; + Ok(()) + } + + /// Persist the consensus state. + pub fn write_consensus_state( + &self, + last_committed: &HashMap, + sub_dag: &CommittedSubDag, + ) -> Result<(), TypedStoreError> { + let commit = ConsensusCommit::V2(ConsensusCommitV2::from_sub_dag(sub_dag)); + + let mut write_batch = self.last_committed.batch(); + write_batch.insert_batch(&self.last_committed, last_committed.iter())?; + write_batch.insert_batch( + &self.committed_sub_dags_by_index_v2, + std::iter::once((sub_dag.sub_dag_index, commit)), + )?; + write_batch.write() + } + + /// Load the last committed round of each validator. + pub fn read_last_committed(&self) -> HashMap { + self.last_committed.iter().collect() + } + + /// Gets the latest sub dag index from the store + pub fn get_latest_sub_dag_index(&self) -> SequenceNumber { + if let Some(s) = self + .committed_sub_dags_by_index_v2 + .iter() + .skip_to_last() + .next() + .map(|(seq, _)| seq) + { + return s; + } + + // TODO: remove once this has been released to the validators + // If nothing has been found on v2, just fallback on the previous storage + self.committed_sub_dags_by_index + .iter() + .skip_to_last() + .next() + .map(|(seq, _)| seq) + .unwrap_or_default() + } + + /// Returns thet latest subdag committed. If none is committed yet, then + /// None is returned instead. + pub fn get_latest_sub_dag(&self) -> Option { + if let Some(sub_dag) = self + .committed_sub_dags_by_index_v2 + .iter() + .skip_to_last() + .next() + .map(|(_, sub_dag)| sub_dag) + { + return Some(sub_dag); + } + + // TODO: remove once this has been released to the validators + // If nothing has been found to the v2 table, just fallback to the previous one. We expect this + // to happen only after validator has upgraded. After that point the v2 table will populated + // and an entry should be found there. + self.committed_sub_dags_by_index + .iter() + .skip_to_last() + .next() + .map(|(_, sub_dag)| ConsensusCommit::V1(sub_dag)) + } + + /// Load all the sub dags committed with sequence number of at least `from`. + pub fn read_committed_sub_dags_from( + &self, + from: &SequenceNumber, + ) -> StoreResult> { + // TODO: remove once this has been released to the validators + // start from the previous table first to ensure we haven't missed anything. + let mut sub_dags = self + .committed_sub_dags_by_index + .iter() + .skip_to(from)? + .map(|(_, sub_dag)| ConsensusCommit::V1(sub_dag)) + .collect::>(); + + sub_dags.extend( + self.committed_sub_dags_by_index_v2 + .iter() + .skip_to(from)? + .map(|(_, sub_dag)| sub_dag) + .collect::>(), + ); + + Ok(sub_dags) + } +} + +#[cfg(test)] +mod test { + use crate::ConsensusStore; + use store::Map; + use types::{CommittedSubDagShell, ConsensusCommit, ConsensusCommitV2, TimestampMs}; + + #[tokio::test] + async fn test_v1_v2_backwards_compatibility() { + let store = ConsensusStore::new_for_tests(); + + // Create few sub dags of V1 and write in the committed_sub_dags_by_index storage + for i in 0..3 { + let s = CommittedSubDagShell { + certificates: vec![], + leader: Default::default(), + leader_round: 2, + sub_dag_index: i, + reputation_score: Default::default(), + }; + + store + .committed_sub_dags_by_index + .insert(&s.sub_dag_index, &s) + .unwrap(); + } + + // Create few sub dags of V2 and write in the committed_sub_dags_by_index_v2 storage + for i in 3..6 { + let s = ConsensusCommitV2 { + certificates: vec![], + leader: Default::default(), + leader_round: 2, + sub_dag_index: i, + reputation_score: Default::default(), + commit_timestamp: i, + }; + + store + .committed_sub_dags_by_index_v2 + .insert(&s.sub_dag_index.clone(), &ConsensusCommit::V2(s)) + .unwrap(); + } + + // Read from index 0, all the sub dags should be returned + let sub_dags = store.read_committed_sub_dags_from(&0).unwrap(); + + assert_eq!(sub_dags.len(), 6); + + for (index, sub_dag) in sub_dags.iter().enumerate() { + assert_eq!(sub_dag.sub_dag_index(), index as u64); + if index < 3 { + assert_eq!(sub_dag.commit_timestamp(), 0); + } else { + assert_eq!(sub_dag.commit_timestamp(), index as TimestampMs); + } + } + + // Read the last sub dag, and the sub dag with index 5 should be returned + let last_sub_dag = store.get_latest_sub_dag(); + assert_eq!(last_sub_dag.unwrap().sub_dag_index(), 5); + + // Read the last sub dag index + let index = store.get_latest_sub_dag_index(); + assert_eq!(index, 5); + } +} diff --git a/narwhal/storage/src/lib.rs b/narwhal/storage/src/lib.rs index 4b5620b5a0034..6ebc541d7bcc9 100644 --- a/narwhal/storage/src/lib.rs +++ b/narwhal/storage/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 mod certificate_store; +mod consensus_store; mod header_store; mod node_store; mod payload_store; @@ -9,8 +10,13 @@ mod proposer_store; mod vote_digest_store; pub use certificate_store::*; +pub use consensus_store::*; pub use header_store::*; pub use node_store::*; pub use payload_store::*; pub use proposer_store::*; +use store::TypedStoreError; pub use vote_digest_store::*; + +/// Convenience type to propagate store errors. +pub type StoreResult = Result; diff --git a/narwhal/storage/src/node_store.rs b/narwhal/storage/src/node_store.rs index 6beefb98dede4..24bb44964c055 100644 --- a/narwhal/storage/src/node_store.rs +++ b/narwhal/storage/src/node_store.rs @@ -4,8 +4,8 @@ use crate::payload_store::PayloadStore; use crate::proposer_store::ProposerKey; use crate::vote_digest_store::VoteDigestStore; use crate::{ - CertificateStore, CertificateStoreCache, CertificateStoreCacheMetrics, HeaderStore, - ProposerStore, + CertificateStore, CertificateStoreCache, CertificateStoreCacheMetrics, ConsensusStore, + HeaderStore, ProposerStore, }; use config::{AuthorityIdentifier, WorkerId}; use std::num::NonZeroUsize; @@ -16,7 +16,7 @@ use store::reopen; use store::rocks::DBMap; use store::rocks::{open_cf, MetricConf, ReadWriteOptions}; use types::{ - Batch, BatchDigest, Certificate, CertificateDigest, CommittedSubDagShell, ConsensusStore, + Batch, BatchDigest, Certificate, CertificateDigest, CommittedSubDagShell, ConsensusCommit, Header, HeaderDigest, Round, SequenceNumber, VoteInfo, }; @@ -47,6 +47,7 @@ impl NodeStorage { pub(crate) const BATCHES_CF: &'static str = "batches"; pub(crate) const LAST_COMMITTED_CF: &'static str = "last_committed"; pub(crate) const SUB_DAG_INDEX_CF: &'static str = "sub_dag"; + pub(crate) const COMMITTED_SUB_DAG_INDEX_CF: &'static str = "committed_sub_dag"; // 100 nodes * 60 rounds (assuming 1 round/sec this will hold data for about the last 1 minute // which should be more than enough for advancing the protocol and also help other nodes) @@ -75,6 +76,7 @@ impl NodeStorage { Self::BATCHES_CF, Self::LAST_COMMITTED_CF, Self::SUB_DAG_INDEX_CF, + Self::COMMITTED_SUB_DAG_INDEX_CF, ], ) .expect("Cannot open database"); @@ -90,6 +92,7 @@ impl NodeStorage { batch_map, last_committed_map, sub_dag_index_map, + committed_sub_dag_map, ) = reopen!(&rocksdb, Self::LAST_PROPOSED_CF;, Self::VOTES_CF;, @@ -100,7 +103,8 @@ impl NodeStorage { Self::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>, Self::BATCHES_CF;, Self::LAST_COMMITTED_CF;, - Self::SUB_DAG_INDEX_CF; + Self::SUB_DAG_INDEX_CF;, + Self::COMMITTED_SUB_DAG_INDEX_CF; ); let proposer_store = ProposerStore::new(last_proposed_map); @@ -119,7 +123,11 @@ impl NodeStorage { ); let payload_store = PayloadStore::new(payload_map); let batch_store = batch_map; - let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sub_dag_index_map)); + let consensus_store = Arc::new(ConsensusStore::new( + last_committed_map, + sub_dag_index_map, + committed_sub_dag_map, + )); Self { proposer_store, diff --git a/narwhal/storage/src/proposer_store.rs b/narwhal/storage/src/proposer_store.rs index 5d86be0f52e1e..de2e6abab5293 100644 --- a/narwhal/storage/src/proposer_store.rs +++ b/narwhal/storage/src/proposer_store.rs @@ -1,10 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::StoreResult; use store::rocks::{open_cf, MetricConf}; use store::{reopen, rocks::DBMap, rocks::ReadWriteOptions, Map}; use sui_macros::fail_point; -use types::{Header, StoreResult}; +use types::Header; pub type ProposerKey = u32; diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 26e3da25e4943..11a9cfef52b2a 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -28,22 +28,21 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, num::NonZeroUsize, ops::RangeInclusive, - sync::Arc, }; +use store::rocks::DBMap; use store::rocks::MetricConf; use store::rocks::ReadWriteOptions; -use store::{reopen, rocks, rocks::DBMap}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::info; use types::{ - Batch, BatchDigest, Certificate, CertificateAPI, CertificateDigest, CommittedSubDagShell, - ConsensusStore, FetchBatchesRequest, FetchBatchesResponse, FetchCertificatesRequest, - FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, HeaderAPI, - HeaderV1Builder, PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryToPrimary, + Batch, BatchDigest, Certificate, CertificateAPI, CertificateDigest, FetchBatchesRequest, + FetchBatchesResponse, FetchCertificatesRequest, FetchCertificatesResponse, + GetCertificatesRequest, GetCertificatesResponse, Header, HeaderAPI, HeaderV1Builder, + PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, RequestBatchesRequest, RequestBatchesResponse, RequestVoteRequest, - RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, SequenceNumber, - TimestampMs, Transaction, Vote, VoteAPI, WorkerBatchMessage, WorkerDeleteBatchesMessage, + RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, TimestampMs, + Transaction, Vote, VoteAPI, WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer, }; @@ -129,27 +128,6 @@ pub fn random_key() -> KeyPair { //////////////////////////////////////////////////////////////// /// Headers, Votes, Certificates //////////////////////////////////////////////////////////////// - -pub fn make_consensus_store(store_path: &std::path::Path) -> Arc { - const LAST_COMMITTED_CF: &str = "last_committed"; - const SEQUENCE_CF: &str = "sequence"; - - let rocksdb = rocks::open_cf( - store_path, - None, - MetricConf::default(), - &[LAST_COMMITTED_CF, SEQUENCE_CF], - ) - .expect("Failed creating database"); - - let (last_committed_map, sequence_map) = reopen!(&rocksdb, - LAST_COMMITTED_CF;, - SEQUENCE_CF; - ); - - Arc::new(ConsensusStore::new(last_committed_map, sequence_map)) -} - pub fn fixture_payload(number_of_batches: u8) -> IndexMap { let mut payload: IndexMap = IndexMap::new(); diff --git a/narwhal/types/src/consensus.rs b/narwhal/types/src/consensus.rs index 49b097a6ee387..d81538a536137 100644 --- a/narwhal/types/src/consensus.rs +++ b/narwhal/types/src/consensus.rs @@ -2,17 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::mutable_key_type)] -use crate::{Batch, Certificate, CertificateAPI, CertificateDigest, HeaderAPI, Round}; +use crate::{Batch, Certificate, CertificateAPI, CertificateDigest, HeaderAPI, Round, TimestampMs}; use config::{AuthorityIdentifier, Committee}; +use enum_dispatch::enum_dispatch; use fastcrypto::hash::Hash; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; -use store::{ - rocks::{DBMap, TypedStoreError}, - traits::Map, -}; use tokio::sync::mpsc; +use tracing::warn; /// A global sequence number assigned to every CommittedSubDag. pub type SequenceNumber = u64; @@ -35,9 +33,56 @@ pub struct CommittedSubDag { pub sub_dag_index: SequenceNumber, /// The so far calculated reputation score for nodes pub reputation_score: ReputationScores, + /// The timestamp that should identify this commit. This is guaranteed to be monotonically + /// incremented. This is not necessarily the leader's timestamp. We compare the leader's timestamp + /// with the previously committed sud dag timestamp and we always keep the max. + /// Property is explicitly private so the method commit_timestamp() should be used instead which + /// bears additional resolution logic. + commit_timestamp: TimestampMs, } impl CommittedSubDag { + pub fn new( + certificates: Vec, + leader: Certificate, + sub_dag_index: SequenceNumber, + reputation_score: ReputationScores, + previous_sub_dag: Option<&CommittedSubDag>, + ) -> Self { + // Narwhal enforces some invariants on the header.created_at, so we can use it as a timestamp. + let previous_sub_dag_ts = previous_sub_dag + .map(|s| s.commit_timestamp) + .unwrap_or_default(); + let commit_timestamp = previous_sub_dag_ts.max(*leader.header().created_at()); + + if previous_sub_dag_ts > *leader.header().created_at() { + warn!(sub_dag_index = ?sub_dag_index, "Leader timestamp {} is older than previously committed sub dag timestamp {}. Auto-correcting to max {}.", + leader.header().created_at(), previous_sub_dag_ts, commit_timestamp); + } + + Self { + certificates, + leader, + sub_dag_index, + reputation_score, + commit_timestamp, + } + } + + pub fn from_commit( + commit: ConsensusCommit, + certificates: Vec, + leader: Certificate, + ) -> Self { + Self { + certificates, + leader, + sub_dag_index: commit.sub_dag_index(), + reputation_score: commit.reputation_score(), + commit_timestamp: commit.commit_timestamp(), + } + } + pub fn len(&self) -> usize { self.certificates.len() } @@ -63,6 +108,16 @@ impl CommittedSubDag { pub fn leader_round(&self) -> Round { self.leader.round() } + + pub fn commit_timestamp(&self) -> TimestampMs { + // If commit_timestamp is zero, then safely assume that this is an upgraded node that is + // replaying this commit and field is never initialised. It's safe to fallback on leader's + // timestamp. + if self.commit_timestamp == 0 { + return *self.leader.header().created_at(); + } + self.commit_timestamp + } } #[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)] @@ -105,13 +160,25 @@ impl ReputationScores { } } +#[enum_dispatch(ConsensusCommitAPI)] +trait ConsensusCommitAPI { + fn certificates(&self) -> Vec; + fn leader(&self) -> CertificateDigest; + fn leader_round(&self) -> Round; + fn sub_dag_index(&self) -> SequenceNumber; + fn reputation_score(&self) -> ReputationScores; + fn commit_timestamp(&self) -> TimestampMs; +} + +// TODO: remove once the upgrade has been rolled out. We want to keep only the +// CommittedSubDag #[derive(Serialize, Deserialize, Clone, Debug)] pub struct CommittedSubDagShell { /// The sequence of committed certificates' digests. pub certificates: Vec, /// The leader certificate's digest responsible of committing this sub-dag. pub leader: CertificateDigest, - // The round of the leader + /// The round of the leader pub leader_round: Round, /// Sequence number of the CommittedSubDag pub sub_dag_index: SequenceNumber, @@ -119,7 +186,52 @@ pub struct CommittedSubDagShell { pub reputation_score: ReputationScores, } -impl CommittedSubDagShell { +impl ConsensusCommitAPI for CommittedSubDagShell { + fn certificates(&self) -> Vec { + self.certificates.clone() + } + + fn leader(&self) -> CertificateDigest { + self.leader + } + + fn leader_round(&self) -> Round { + self.leader_round + } + + fn sub_dag_index(&self) -> SequenceNumber { + self.sub_dag_index + } + + fn reputation_score(&self) -> ReputationScores { + self.reputation_score.clone() + } + + fn commit_timestamp(&self) -> TimestampMs { + // We explicitly return 0 as we don't have this information stored already. This will be + // handle accordingly to the CommittedSubdag struct. + 0 + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ConsensusCommitV2 { + /// The sequence of committed certificates' digests. + pub certificates: Vec, + /// The leader certificate's digest responsible of committing this sub-dag. + pub leader: CertificateDigest, + /// The round of the leader + pub leader_round: Round, + /// Sequence number of the CommittedSubDag + pub sub_dag_index: SequenceNumber, + /// The so far calculated reputation score for nodes + pub reputation_score: ReputationScores, + /// The timestamp that should identify this commit. This is guaranteed to be monotonically + /// incremented + pub commit_timestamp: TimestampMs, +} + +impl ConsensusCommitV2 { pub fn from_sub_dag(sub_dag: &CommittedSubDag) -> Self { Self { certificates: sub_dag.certificates.iter().map(|x| x.digest()).collect(), @@ -127,97 +239,208 @@ impl CommittedSubDagShell { leader_round: sub_dag.leader.round(), sub_dag_index: sub_dag.sub_dag_index, reputation_score: sub_dag.reputation_score.clone(), + commit_timestamp: sub_dag.commit_timestamp, } } } -/// Shutdown token dropped when a task is properly shut down. -pub type ShutdownToken = mpsc::Sender<()>; +impl ConsensusCommitAPI for ConsensusCommitV2 { + fn certificates(&self) -> Vec { + self.certificates.clone() + } + + fn leader(&self) -> CertificateDigest { + self.leader + } + + fn leader_round(&self) -> Round { + self.leader_round + } -/// Convenience type to propagate store errors. -pub type StoreResult = Result; + fn sub_dag_index(&self) -> SequenceNumber { + self.sub_dag_index + } + + fn reputation_score(&self) -> ReputationScores { + self.reputation_score.clone() + } -/// The persistent storage of the sequencer. -pub struct ConsensusStore { - /// The latest committed round of each validator. - last_committed: DBMap, - /// The global consensus sequence. - committed_sub_dags_by_index: DBMap, + fn commit_timestamp(&self) -> TimestampMs { + self.commit_timestamp + } } -impl ConsensusStore { - /// Create a new consensus store structure by using already loaded maps. - pub fn new( - last_committed: DBMap, - sequence: DBMap, - ) -> Self { - Self { - last_committed, - committed_sub_dags_by_index: sequence, +#[derive(Serialize, Deserialize, Clone, Debug)] +#[enum_dispatch(ConsensusCommitAPI)] +pub enum ConsensusCommit { + V1(CommittedSubDagShell), + V2(ConsensusCommitV2), +} + +impl ConsensusCommit { + pub fn certificates(&self) -> Vec { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.certificates(), + ConsensusCommit::V2(sub_dag) => sub_dag.certificates(), } } - /// Clear the store. - pub fn clear(&self) -> StoreResult<()> { - self.last_committed.clear()?; - self.committed_sub_dags_by_index.clear()?; - Ok(()) + pub fn leader(&self) -> CertificateDigest { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.leader(), + ConsensusCommit::V2(sub_dag) => sub_dag.leader(), + } } - /// Persist the consensus state. - pub fn write_consensus_state( - &self, - last_committed: &HashMap, - sub_dag: &CommittedSubDag, - ) -> Result<(), TypedStoreError> { - let shell = CommittedSubDagShell::from_sub_dag(sub_dag); + pub fn leader_round(&self) -> Round { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.leader_round(), + ConsensusCommit::V2(sub_dag) => sub_dag.leader_round(), + } + } - let mut write_batch = self.last_committed.batch(); - write_batch.insert_batch(&self.last_committed, last_committed.iter())?; - write_batch.insert_batch( - &self.committed_sub_dags_by_index, - std::iter::once((sub_dag.sub_dag_index, shell)), - )?; - write_batch.write() + pub fn sub_dag_index(&self) -> SequenceNumber { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.sub_dag_index(), + ConsensusCommit::V2(sub_dag) => sub_dag.sub_dag_index(), + } } - /// Load the last committed round of each validator. - pub fn read_last_committed(&self) -> HashMap { - self.last_committed.iter().collect() + pub fn reputation_score(&self) -> ReputationScores { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.reputation_score(), + ConsensusCommit::V2(sub_dag) => sub_dag.reputation_score(), + } } - /// Gets the latest sub dag index from the store - pub fn get_latest_sub_dag_index(&self) -> SequenceNumber { - let s = self - .committed_sub_dags_by_index - .iter() - .skip_to_last() - .next() - .map(|(seq, _)| seq) - .unwrap_or_default(); - s + pub fn commit_timestamp(&self) -> TimestampMs { + match self { + ConsensusCommit::V1(sub_dag) => sub_dag.commit_timestamp(), + ConsensusCommit::V2(sub_dag) => sub_dag.commit_timestamp(), + } } +} - /// Returns thet latest subdag committed. If none is committed yet, then - /// None is returned instead. - pub fn get_latest_sub_dag(&self) -> Option { - self.committed_sub_dags_by_index - .iter() - .skip_to_last() - .next() - .map(|(_, subdag)| subdag) - } - - /// Load all the sub dags committed with sequence number of at least `from`. - pub fn read_committed_sub_dags_from( - &self, - from: &SequenceNumber, - ) -> StoreResult> { - Ok(self - .committed_sub_dags_by_index - .iter() - .skip_to(from)? - .map(|(_, sub_dag)| sub_dag) - .collect()) +impl CommittedSubDagShell { + pub fn from_sub_dag(sub_dag: &CommittedSubDag) -> Self { + Self { + certificates: sub_dag.certificates.iter().map(|x| x.digest()).collect(), + leader: sub_dag.leader.digest(), + leader_round: sub_dag.leader.round(), + sub_dag_index: sub_dag.sub_dag_index, + reputation_score: sub_dag.reputation_score.clone(), + } + } +} + +/// Shutdown token dropped when a task is properly shut down. +pub type ShutdownToken = mpsc::Sender<()>; + +#[cfg(test)] +mod tests { + use crate::{Certificate, Header, HeaderV1Builder}; + use crate::{CommittedSubDag, ReputationScores}; + use config::AuthorityIdentifier; + use indexmap::IndexMap; + use std::collections::BTreeSet; + use test_utils::CommitteeFixture; + + #[test] + fn test_zero_timestamp_in_sub_dag() { + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + + let header_builder = HeaderV1Builder::default(); + let header = header_builder + .author(AuthorityIdentifier(1u16)) + .round(2) + .epoch(0) + .created_at(50) + .payload(IndexMap::new()) + .parents(BTreeSet::new()) + .build() + .unwrap(); + + let certificate = + Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + + // AND we initialise the sub dag via the "restore" way + let sub_dag_round = CommittedSubDag { + certificates: vec![certificate.clone()], + leader: certificate, + sub_dag_index: 1, + reputation_score: ReputationScores::default(), + commit_timestamp: 0, + }; + + // AND commit timestamp is the leader's timestamp + assert_eq!(sub_dag_round.commit_timestamp(), 50); + } + + #[test] + fn test_monotonically_incremented_commit_timestamps() { + // Create a certificate (leader) of round 2 with a high timestamp + let newer_timestamp = 100; + let older_timestamp = 50; + + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + + let header_builder = HeaderV1Builder::default(); + let header = header_builder + .author(AuthorityIdentifier(1u16)) + .round(2) + .epoch(0) + .created_at(newer_timestamp) + .payload(IndexMap::new()) + .parents(BTreeSet::new()) + .build() + .unwrap(); + + let certificate = + Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + + // AND + let sub_dag_round_2 = CommittedSubDag::new( + vec![certificate.clone()], + certificate, + 1, + ReputationScores::default(), + None, + ); + + // AND commit timestamp is the leader's timestamp + assert_eq!(sub_dag_round_2.commit_timestamp, newer_timestamp); + + // Now create the leader of round 4 with the older timestamp + let header_builder = HeaderV1Builder::default(); + let header = header_builder + .author(AuthorityIdentifier(1u16)) + .round(4) + .epoch(0) + .created_at(older_timestamp) + .payload(IndexMap::new()) + .parents(BTreeSet::new()) + .build() + .unwrap(); + + let certificate = + Certificate::new_unsigned(&committee, Header::V1(header), Vec::new()).unwrap(); + + // WHEN create the sub dag based on the "previously committed" sub dag. + let sub_dag_round_4 = CommittedSubDag::new( + vec![certificate.clone()], + certificate, + 2, + ReputationScores::default(), + Some(&sub_dag_round_2), + ); + + // THEN the latest sub dag should have the highest committed timestamp - basically the + // same as the previous commit round + assert_eq!( + sub_dag_round_4.commit_timestamp, + sub_dag_round_2.commit_timestamp + ); } }