Skip to content

Commit

Permalink
[consensus] re-apply consensus timestamp changes (MystenLabs#10507)
Browse files Browse the repository at this point in the history
## Description 

Re-reverting the changes of MystenLabs#10427
to apply all the improvements on the consensus commit timestamp checks.

Follow up PR , to make the changes backwards compatible:
MystenLabs#10508

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Apr 8, 2023
1 parent e92d5a6 commit 83a81eb
Show file tree
Hide file tree
Showing 17 changed files with 718 additions and 224 deletions.
6 changes: 5 additions & 1 deletion crates/sui-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use enum_dispatch::enum_dispatch;
use serde::{Deserialize, Serialize};

use sui_types::epoch_data::EpochData;
use sui_types::messages_checkpoint::CheckpointDigest;
use sui_types::messages_checkpoint::{CheckpointDigest, CheckpointTimestamp};
use sui_types::sui_system_state::epoch_start_sui_system_state::{
EpochStartSystemState, EpochStartSystemStateTrait,
};
Expand Down Expand Up @@ -42,6 +42,10 @@ impl EpochStartConfiguration {
self.epoch_digest(),
)
}

pub fn epoch_start_timestamp_ms(&self) -> CheckpointTimestamp {
self.epoch_start_state().epoch_start_timestamp_ms()
}
}

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
Expand Down
26 changes: 17 additions & 9 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use sui_types::messages::{
ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
VerifiedExecutableTransaction, VerifiedTransaction,
};

use sui_types::storage::ParentSync;

use tracing::{debug, error, instrument};
Expand Down Expand Up @@ -131,8 +130,22 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

/* (serialized, transaction, output_cert) */
let mut transactions = vec![];
// Narwhal enforces some invariants on the header.created_at, so we can use it as a timestamp
let timestamp = *consensus_output.sub_dag.leader.header().created_at();
let timestamp = consensus_output.sub_dag.commit_timestamp();
let leader_author = consensus_output.sub_dag.leader.header().author();

let epoch_start = self
.epoch_store
.epoch_start_config()
.epoch_start_timestamp_ms();
let timestamp = if timestamp < epoch_start {
error!(
"Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}, round {round}",

);
epoch_start
} else {
timestamp
};

let prologue_transaction = self.consensus_commit_prologue_transaction(round, timestamp);
transactions.push((
Expand All @@ -152,12 +165,7 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

self.metrics
.consensus_committed_subdags
.with_label_values(&[&consensus_output
.sub_dag
.leader
.header()
.author()
.to_string()])
.with_label_values(&[&leader_author.to_string()])
.inc();
for (cert, batches) in consensus_output.batches {
let author = cert.header().author();
Expand Down
7 changes: 4 additions & 3 deletions narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use narwhal_consensus as consensus;
use pprof::criterion::{Output, PProfProfiler};
use prometheus::Registry;
use std::{collections::BTreeSet, sync::Arc};
use storage::NodeStorage;
use test_utils::{make_consensus_store, make_optimal_certificates, temp_dir, CommitteeFixture};
use tokio::time::Instant;
use types::{Certificate, Round};
Expand Down Expand Up @@ -41,10 +42,10 @@ pub fn process_certificates(c: &mut Criterion) {
make_optimal_certificates(&committee, 1..=rounds, &genesis, &keys);

let store_path = temp_dir();
let store = make_consensus_store(&store_path);
let store = NodeStorage::reopen(&store_path, None);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));

let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);
let mut state = ConsensusState::new(metrics.clone(), gc_depth);

let data_size: usize = certificates
.iter()
Expand All @@ -54,7 +55,7 @@ pub fn process_certificates(c: &mut Criterion) {

let mut ordering_engine = Bullshark {
committee: committee.clone(),
store,
store: store.consensus_store,
metrics,
last_successful_leader_election_timestamp: Instant::now(),
last_leader_election: Default::default(),
Expand Down
66 changes: 37 additions & 29 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::{
use config::{AuthorityIdentifier, Committee, Stake};
use fastcrypto::hash::Hash;
use std::sync::Arc;
use storage::ConsensusStore;
use tokio::time::Instant;
use tracing::{debug, error_span};
use types::{
Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI,
ReputationScores, Round,
Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, HeaderAPI, ReputationScores,
Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -155,7 +156,7 @@ impl ConsensusProtocol for Bullshark {
.iter()
.rev()
{
let sub_dag_index = state.latest_sub_dag_index + 1;
let sub_dag_index = state.next_sub_dag_index();
let _span = error_span!("bullshark_process_sub_dag", sub_dag_index);

debug!("Leader {:?} has enough support", leader);
Expand All @@ -178,23 +179,23 @@ impl ConsensusProtocol for Bullshark {

total_committed_certificates += sequence.len();

// We update the reputation score stored in state
let reputation_score = self.update_reputation_score(state, &sequence, sub_dag_index);
// We resolve the reputation score that should be stored alongside with this sub dag.
let reputation_score = self.resolve_reputation_score(state, &sequence, sub_dag_index);

let sub_dag = CommittedSubDag {
certificates: sequence,
leader: leader.clone(),
let sub_dag = CommittedSubDag::new(
sequence,
leader.clone(),
sub_dag_index,
reputation_score,
};
state.last_committed_sub_dag.as_ref(),
);

// Persist the update.
self.store
.write_consensus_state(&state.last_committed, &sub_dag)?;

// Increase the global consensus index.
state.latest_sub_dag_index = sub_dag_index;
state.last_committed_leader = Some(sub_dag.leader.digest());
// Update the last sub dag
state.last_committed_sub_dag = Some(sub_dag.clone());

committed_sub_dags.push(sub_dag);
}
Expand Down Expand Up @@ -302,52 +303,59 @@ impl Bullshark {
dag.get(&round).and_then(|x| x.get(&leader))
}

/// Updates and calculates the reputation score for the current commit managing any internal state.
/// It returns the updated reputation score.
fn update_reputation_score(
&mut self,
/// Calculates the reputation score for the current commit by taking into account the reputation
/// scores from the previous commit (assuming that exists). It returns the updated reputation score.
fn resolve_reputation_score(
&self,
state: &mut ConsensusState,
committed_sequence: &[Certificate],
sub_dag_index: u64,
) -> ReputationScores {
// we reset the scores for every schedule change window.
// we reset the scores for every schedule change window, or initialise when it's the first
// sub dag we are going to create.
// TODO: when schedule change is implemented we should probably change a little bit
// this logic here.
if sub_dag_index % self.num_sub_dags_per_schedule == 0 {
state.last_consensus_reputation_score = ReputationScores::new(&self.committee)
}
let mut reputation_score =
if sub_dag_index == 1 || sub_dag_index % self.num_sub_dags_per_schedule == 0 {
ReputationScores::new(&self.committee)
} else {
state
.last_committed_sub_dag
.as_ref()
.expect("Committed sub dag should always exist for sub_dag_index > 1")
.reputation_score
.clone()
};

// update the score for the previous leader. If no previous leader exists,
// then this is the first time we commit a leader, so no score update takes place
if let Some(previous_leader) = state.last_committed_leader {
if let Some(last_committed_sub_dag) = state.last_committed_sub_dag.as_ref() {
for certificate in committed_sequence {
// TODO: we could iterate only the certificates of the round above the previous leader's round
if certificate
.header()
.parents()
.iter()
.any(|digest| *digest == previous_leader)
.any(|digest| *digest == last_committed_sub_dag.leader.digest())
{
state
.last_consensus_reputation_score
.add_score(certificate.origin(), 1);
reputation_score.add_score(certificate.origin(), 1);
}
}
}

// we check if this is the last subdag of the current schedule. If yes then we mark the
// we check if this is the last sub dag of the current schedule. If yes then we mark the
// scores as final_of_schedule = true so any downstream user can now that those are the last
// ones calculated for the current schedule.
state.last_consensus_reputation_score.final_of_schedule =
reputation_score.final_of_schedule =
(sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0;

// Always ensure that all the authorities are present in the reputation scores - even
// when score is zero.
assert_eq!(
state.last_consensus_reputation_score.total_authorities() as usize,
reputation_score.total_authorities() as usize,
self.committee.size()
);

state.last_consensus_reputation_score.clone()
reputation_score
}
}
76 changes: 48 additions & 28 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::Arc,
};
use storage::CertificateStore;
use storage::{CertificateStore, ConsensusStore};
use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, CertificateAPI, CertificateDigest, CommittedSubDag,
CommittedSubDagShell, ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI,
ReputationScores, Round, Timestamp,
ConditionalBroadcastReceiver, ConsensusCommit, HeaderAPI, Round, Timestamp,
};

#[cfg(test)]
Expand All @@ -39,13 +38,8 @@ pub struct ConsensusState {
/// Keeps the last committed round for each authority. This map is used to clean up the dag and
/// ensure we don't commit twice the same certificate.
pub last_committed: HashMap<AuthorityIdentifier, Round>,
/// Used to populate the index in the sub-dag construction.
pub latest_sub_dag_index: SequenceNumber,
/// The last calculated consensus reputation score
pub last_consensus_reputation_score: ReputationScores,
/// The last committed sub dag leader. This allow us to calculate the reputation score of the nodes
/// that vote for the last leader.
pub last_committed_leader: Option<CertificateDigest>,
/// The last committed sub dag. If value is None, it means that we haven't committed any sub dag yet.
pub last_committed_sub_dag: Option<CommittedSubDag>,
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
pub dag: Dag,
Expand All @@ -54,15 +48,13 @@ pub struct ConsensusState {
}

impl ConsensusState {
pub fn new(metrics: Arc<ConsensusMetrics>, committee: &Committee, gc_depth: Round) -> Self {
pub fn new(metrics: Arc<ConsensusMetrics>, gc_depth: Round) -> Self {
Self {
last_round: ConsensusRound::default(),
gc_depth,
last_committed: Default::default(),
latest_sub_dag_index: 0,
dag: Default::default(),
last_consensus_reputation_score: ReputationScores::new(committee),
last_committed_leader: None,
last_committed_sub_dag: None,
metrics,
}
}
Expand All @@ -72,40 +64,58 @@ impl ConsensusState {
last_committed_round: Round,
gc_depth: Round,
recovered_last_committed: HashMap<AuthorityIdentifier, Round>,
latest_sub_dag: Option<CommittedSubDagShell>,
latest_sub_dag: Option<ConsensusCommit>,
cert_store: CertificateStore,
committee: &Committee,
) -> Self {
let last_round = ConsensusRound::new_with_gc_depth(last_committed_round, gc_depth);

let dag = Self::construct_dag_from_cert_store(
cert_store,
&cert_store,
&recovered_last_committed,
last_round.gc_round,
)
.expect("error when recovering DAG from store");
metrics.recovered_consensus_state.inc();

let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) =
latest_sub_dag
.map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader)))
.unwrap_or((0, ReputationScores::new(committee), None));
let last_committed_sub_dag = if let Some(latest_sub_dag) = latest_sub_dag.as_ref() {
let certificates = latest_sub_dag
.certificates()
.iter()
.map(|s| {
cert_store
.read(*s)
.unwrap()
.expect("Certificate should be found in database")
})
.collect();

let leader = cert_store
.read(latest_sub_dag.leader())
.unwrap()
.expect("Certificate should be found in database");

Some(CommittedSubDag::from_commit(
latest_sub_dag.clone(),
certificates,
leader,
))
} else {
None
};

Self {
gc_depth,
last_round,
last_committed: recovered_last_committed,
last_consensus_reputation_score,
latest_sub_dag_index,
last_committed_leader,
last_committed_sub_dag,
dag,
metrics,
}
}

#[instrument(level = "info", skip_all)]
pub fn construct_dag_from_cert_store(
cert_store: CertificateStore,
cert_store: &CertificateStore,
last_committed: &HashMap<AuthorityIdentifier, Round>,
gc_round: Round,
) -> Result<Dag, ConsensusError> {
Expand Down Expand Up @@ -229,6 +239,15 @@ impl ConsensusState {
panic!("Parent round not found in DAG for {certificate:?}!");
}
}

/// Provides the next index to be used for the next produced sub dag
pub fn next_sub_dag_index(&self) -> SequenceNumber {
self.last_committed_sub_dag
.as_ref()
.map(|s| s.sub_dag_index)
.unwrap_or_default()
+ 1
}
}

/// Describe how to sequence input certificates.
Expand Down Expand Up @@ -336,9 +355,11 @@ where
let latest_sub_dag = store.get_latest_sub_dag();
if let Some(sub_dag) = &latest_sub_dag {
assert_eq!(
sub_dag.leader_round, last_committed_round,
sub_dag.leader_round(),
last_committed_round,
"Last subdag leader round {} is not equal to the last committed round {}!",
sub_dag.leader_round, last_committed_round,
sub_dag.leader_round(),
last_committed_round,
);
}

Expand All @@ -349,7 +370,6 @@ where
recovered_last_committed,
latest_sub_dag,
cert_store,
&committee,
);

tx_consensus_round_updates
Expand Down
Loading

0 comments on commit 83a81eb

Please sign in to comment.