Skip to content

Commit

Permalink
[checkpoints] Simplify checkpoint roots calculation (MystenLabs#10772)
Browse files Browse the repository at this point in the history
Because we used to have 'streaming' interface from narwhal to sui, we
needed two tables (`consensus_message_order` and `checkpoint_boundary`)
to derive checkpoint roots on the commit boundary.

However, now that the interface is simplified, and we have an entire
commit, those tables are not needed. Bunch of code is removed and DB
writes are reduced
  • Loading branch information
andll authored Apr 12, 2023
1 parent 4b5caa9 commit cd81f95
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 38 deletions.
30 changes: 22 additions & 8 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use typed_store::rocks::{
};
use typed_store::traits::{TableSummary, TypedStoreDebug};

use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
use crate::authority::{AuthorityStore, ResolverWrapper};
use crate::checkpoints::{
BuilderCheckpointSummary, CheckpointCommitHeight, CheckpointServiceNotify, EpochStats,
Expand Down Expand Up @@ -165,6 +165,8 @@ pub struct AuthorityPerEpochStore {

/// Execution state that has to restart at each epoch change
execution_component: ExecutionComponents,
/// Fast cache for EpochFlag::InMemoryCheckpointRoots
pub(crate) in_memory_checkpoint_roots: bool,
}

/// AuthorityEpochTables contains tables that contain data that is only valid within an epoch.
Expand Down Expand Up @@ -219,6 +221,7 @@ pub struct AuthorityEpochTables {
/// Map stores pending transactions that this authority submitted to consensus
pending_consensus_transactions: DBMap<ConsensusTransactionKey, ConsensusTransaction>,

// todo - this table will be deleted after switch to EpochFlag::InMemoryCheckpointRoots
/// This is an inverse index for consensus_message_processed - it allows to select
/// all transactions at the specific consensus range
///
Expand All @@ -232,6 +235,7 @@ pub struct AuthorityEpochTables {
/// every message output by consensus (and in the right order).
last_consensus_index: DBMap<u64, ExecutionIndicesWithHash>,

// todo - this table will be deleted after switch to EpochFlag::InMemoryCheckpointRoots
/// This table lists all checkpoint boundaries in the consensus sequence
///
/// The key in this table is incremental index and value is corresponding narwhal
Expand Down Expand Up @@ -274,7 +278,7 @@ pub struct AuthorityEpochTables {
/// user signature for this transaction here. This will be included in the checkpoint later.
user_signatures_for_checkpoints: DBMap<TransactionDigest, Vec<GenericSignature>>,

/// This table is not ued
/// This table is not used
#[allow(dead_code)]
builder_checkpoint_summary: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
/// Maps sequence number to checkpoint summary, used by CheckpointBuilder to build checkpoint within epoch
Expand Down Expand Up @@ -389,6 +393,13 @@ impl AuthorityPerEpochStore {
);
let signature_verifier =
SignatureVerifier::new(committee.clone(), signature_verifier_metrics);
let in_memory_checkpoint_roots = epoch_start_configuration
.flags()
.contains(&EpochFlag::InMemoryCheckpointRoots);
info!(
"in_memory_checkpoint_roots = {}",
in_memory_checkpoint_roots
);
let s = Arc::new(Self {
committee,
protocol_config,
Expand All @@ -409,6 +420,7 @@ impl AuthorityPerEpochStore {
metrics,
epoch_start_configuration,
execution_component,
in_memory_checkpoint_roots,
});
s.update_buffer_stake_metric();
s
Expand Down Expand Up @@ -1304,11 +1316,13 @@ impl AuthorityPerEpochStore {
certificate: &VerifiedExecutableTransaction,
consensus_index: ExecutionIndicesWithHash,
) -> SuiResult {
let transaction_digest = *certificate.digest();
batch.insert_batch(
&self.tables.consensus_message_order,
[(consensus_index.index, transaction_digest)],
)?;
if !self.in_memory_checkpoint_roots {
let transaction_digest = *certificate.digest();
batch.insert_batch(
&self.tables.consensus_message_order,
[(consensus_index.index, transaction_digest)],
)?;
}
batch.insert_batch(
&self.tables.pending_execution,
[(*certificate.digest(), certificate.clone().serializable())],
Expand Down Expand Up @@ -1899,7 +1913,7 @@ impl AuthorityPerEpochStore {
}
}

fn record_end_of_message_quorum_time_metric(&self) {
pub fn record_end_of_message_quorum_time_metric(&self) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_end_of_publish_quorum_time_since_epoch_close_ms
Expand Down
33 changes: 10 additions & 23 deletions crates/sui-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ pub trait EpochStartConfigTrait {
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum EpochFlag {}
pub enum EpochFlag {
InMemoryCheckpointRoots,
}

/// Parameters of the epoch fixed at epoch start.
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -55,9 +57,10 @@ impl EpochStartConfiguration {
}

pub fn new_for_testing() -> Self {
Self::new_v1(
Self::new_v2(
EpochStartSystemState::new_for_testing(),
CheckpointDigest::default(),
EpochFlag::default_flags_for_new_epoch(),
)
}

Expand Down Expand Up @@ -98,13 +101,6 @@ impl EpochStartConfigurationV1 {
epoch_digest,
}
}

pub fn new_for_testing() -> Self {
Self::new(
EpochStartSystemState::new_for_testing(),
CheckpointDigest::default(),
)
}
}

impl EpochStartConfigurationV2 {
Expand All @@ -119,14 +115,6 @@ impl EpochStartConfigurationV2 {
flags,
}
}

pub fn new_for_testing() -> Self {
Self::new(
EpochStartSystemState::new_for_testing(),
CheckpointDigest::default(),
vec![],
)
}
}

impl EpochStartConfigTrait for EpochStartConfigurationV1 {
Expand Down Expand Up @@ -159,16 +147,15 @@ impl EpochStartConfigTrait for EpochStartConfigurationV2 {

impl EpochFlag {
pub fn default_flags_for_new_epoch() -> Vec<Self> {
vec![]
vec![EpochFlag::InMemoryCheckpointRoots]
}
}

impl fmt::Display for EpochFlag {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Important - implementation should return low cardinality values because this is used as metric key
// match self {
//
// }
Ok(())
match self {
EpochFlag::InMemoryCheckpointRoots => write!(f, "InMemoryCheckpointRoots"),
}
}
}
69 changes: 62 additions & 7 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::authority::authority_per_epoch_store::{
AuthorityPerEpochStore, ExecutionIndicesWithHash,
};
use crate::authority::AuthorityMetrics;
use crate::checkpoints::CheckpointService;
use crate::checkpoints::{
CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo,
};
use std::cmp::Ordering;

use crate::scoring_decision::update_low_scoring_authorities;
use crate::transaction_manager::TransactionManager;
Expand All @@ -20,7 +23,7 @@ use narwhal_types::{BatchAPI, CertificateAPI, ConsensusOutput, HeaderAPI};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::sync::Arc;
Expand All @@ -31,7 +34,7 @@ use sui_types::messages::{
};
use sui_types::storage::ParentSync;

use tracing::{debug, error, instrument};
use tracing::{debug, error, info, instrument};

pub struct ConsensusHandler<T> {
/// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
Expand Down Expand Up @@ -206,7 +209,12 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {
}
}

let mut roots = BTreeSet::new();
for (seq, (serialized, transaction, output_cert)) in transactions.into_iter().enumerate() {
if let Some(digest) = transaction.executable_transaction_digest() {
roots.insert(digest);
}

let index = ExecutionIndices {
last_committed_round: round,
sub_dag_index: consensus_output.sub_dag.sub_dag_index,
Expand Down Expand Up @@ -289,10 +297,44 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {
self.transaction_scheduler
.schedule(transactions_to_schedule)
.await;

self.epoch_store
.handle_commit_boundary(round, timestamp, &self.checkpoint_service)
.expect("Unrecoverable error in consensus handler when processing commit boundary")
if self.epoch_store.in_memory_checkpoint_roots {
// The last block in this function notifies about new checkpoint if needed
let final_checkpoint_round = self
.epoch_store
.final_epoch_checkpoint()
.expect("final_epoch_checkpoint failed");
let final_checkpoint = match final_checkpoint_round.map(|r| r.cmp(&round)) {
Some(Ordering::Less) => {
debug!(
"Not forming checkpoint for round {} above final checkpoint round {:?}",
round, final_checkpoint_round
);
return;
}
Some(Ordering::Equal) => true,
Some(Ordering::Greater) => false,
None => false,
};
let checkpoint = PendingCheckpoint {
roots: roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: timestamp,
last_of_epoch: final_checkpoint,
commit_height: round,
},
};
self.checkpoint_service
.notify_checkpoint(&self.epoch_store, checkpoint)
.expect("notify_checkpoint has failed");
if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.epoch_store.record_end_of_message_quorum_time_metric();
}
} else {
self.epoch_store
.handle_commit_boundary(round, timestamp, &self.checkpoint_service)
.expect("Unrecoverable error in consensus handler when processing commit boundary")
}
}

async fn last_executed_sub_dag_index(&self) -> u64 {
Expand Down Expand Up @@ -415,6 +457,19 @@ impl SequencedConsensusTransactionKind {
SequencedConsensusTransactionKind::System(_) => true,
}
}

pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
match self {
SequencedConsensusTransactionKind::External(ext) => {
if let ConsensusTransactionKind::UserTransaction(txn) = &ext.kind {
Some(*txn.digest())
} else {
None
}
}
SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
}
}
}

impl SequencedConsensusTransaction {
Expand Down

0 comments on commit cd81f95

Please sign in to comment.