Skip to content

Commit

Permalink
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 195 deletions.
166 changes: 7 additions & 159 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use futures::future::{join_all, select, Either};
use futures::FutureExt;
use narwhal_executor::ExecutionIndices;
use narwhal_types::Round;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard};
use rocksdb::Options;
Expand Down Expand Up @@ -35,7 +34,7 @@ use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfigura
use crate::authority::{AuthorityStore, ResolverWrapper};
use crate::checkpoints::{
BuilderCheckpointSummary, CheckpointCommitHeight, CheckpointServiceNotify, EpochStats,
PendingCheckpoint, PendingCheckpointInfo,
PendingCheckpoint,
};
use crate::consensus_handler::{
SequencedConsensusTransaction, SequencedConsensusTransactionKey,
Expand All @@ -53,7 +52,6 @@ use mysten_common::sync::notify_once::NotifyOnce;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::monitored_scope;
use prometheus::IntCounter;
use std::cmp::Ordering as CmpOrdering;
use sui_adapter::adapter;
use sui_macros::fail_point;
use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
Expand All @@ -65,7 +63,6 @@ use sui_types::executable_transaction::{
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::messages_checkpoint::{
CheckpointContents, CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointSummary,
CheckpointTimestamp,
};
use sui_types::messages_consensus::{
AuthorityCapabilities, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
Expand Down Expand Up @@ -170,8 +167,6 @@ pub struct AuthorityPerEpochStore {

/// Execution state that has to restart at each epoch change
execution_component: ExecutionComponents,
/// Fast cache for EpochFlag::InMemoryCheckpointRoots
pub(crate) in_memory_checkpoint_roots: bool,
}

/// AuthorityEpochTables contains tables that contain data that is only valid within an epoch.
Expand Down Expand Up @@ -228,12 +223,8 @@ pub struct AuthorityEpochTables {
#[default_options_override_fn = "pending_consensus_transactions_table_default_config"]
pending_consensus_transactions: DBMap<ConsensusTransactionKey, ConsensusTransaction>,

// todo - this table will be deleted after switch to EpochFlag::InMemoryCheckpointRoots
/// This is an inverse index for consensus_message_processed - it allows to select
/// all transactions at the specific consensus range
///
/// The consensus position for the transaction is defined as first position at which valid
/// certificate for this transaction is seen in consensus
/// this table is not used
#[allow(dead_code)]
consensus_message_order: DBMap<ExecutionIndices, TransactionDigest>,

/// The following table is used to store a single value (the corresponding key is a constant). The value
Expand All @@ -242,11 +233,8 @@ pub struct AuthorityEpochTables {
/// every message output by consensus (and in the right order).
last_consensus_index: DBMap<u64, ExecutionIndicesWithHash>,

// todo - this table will be deleted after switch to EpochFlag::InMemoryCheckpointRoots
/// This table lists all checkpoint boundaries in the consensus sequence
///
/// The key in this table is incremental index and value is corresponding narwhal
/// consensus output index
/// this table is not used
#[allow(dead_code)]
checkpoint_boundary: DBMap<u64, u64>,

/// This table contains current reconfiguration state for validator for current epoch
Expand Down Expand Up @@ -423,13 +411,9 @@ impl AuthorityPerEpochStore {
);
let signature_verifier =
SignatureVerifier::new(committee.clone(), signature_verifier_metrics);
let in_memory_checkpoint_roots = epoch_start_configuration
assert!(epoch_start_configuration
.flags()
.contains(&EpochFlag::InMemoryCheckpointRoots);
info!(
"in_memory_checkpoint_roots = {}",
in_memory_checkpoint_roots
);
.contains(&EpochFlag::InMemoryCheckpointRoots));
let s = Arc::new(Self {
committee,
protocol_config,
Expand All @@ -450,7 +434,6 @@ impl AuthorityPerEpochStore {
metrics,
epoch_start_configuration,
execution_component,
in_memory_checkpoint_roots,
});
s.update_buffer_stake_metric();
s
Expand Down Expand Up @@ -644,13 +627,6 @@ impl AuthorityPerEpochStore {
Ok(self.tables.next_shared_object_versions.multi_get(ids)?)
}

pub fn get_last_checkpoint_boundary(&self) -> (u64, Option<u64>) {
match self.tables.checkpoint_boundary.iter().skip_to_last().next() {
Some((idx, height)) => (idx, Some(height)),
None => (0, None),
}
}

pub fn get_last_consensus_index(&self) -> SuiResult<ExecutionIndicesWithHash> {
self.tables
.last_consensus_index
Expand All @@ -673,27 +649,6 @@ impl AuthorityPerEpochStore {
.collect())
}

pub fn get_transactions_in_checkpoint_range(
&self,
from_height_excluded: Option<u64>,
to_height_included: u64,
) -> SuiResult<Vec<TransactionDigest>> {
let mut iter = self.tables.consensus_message_order.iter();
if let Some(from_height_excluded) = from_height_excluded {
let last_previous = ExecutionIndices::end_for_commit(from_height_excluded);
iter = iter.skip_to(&last_previous)?;
}
// skip_to lands to key the last_key or key after it
// technically here we need to check if first item in stream has a key equal to last_previous
// however in practice this can not happen because number of batches in certificate is
// limited and is less then u64::MAX
let roots: Vec<_> = iter
.take_while(|(idx, _tx)| idx.last_committed_round <= to_height_included)
.map(|(_idx, tx)| tx)
.collect();
Ok(roots)
}

/// Returns future containing the state digest for the given epoch
/// once available
pub async fn notify_read_checkpoint_state_digests(
Expand Down Expand Up @@ -919,11 +874,6 @@ impl AuthorityPerEpochStore {
.await
}

pub fn insert_checkpoint_boundary(&self, index: u64, height: u64) -> SuiResult {
self.tables.checkpoint_boundary.insert(&index, &height)?;
Ok(())
}

/// When submitting a certificate caller **must** provide a ReconfigState lock guard
/// and verify that it allows new user certificates
pub fn insert_pending_consensus_transactions(
Expand Down Expand Up @@ -1303,7 +1253,6 @@ impl AuthorityPerEpochStore {
/// Tables updated:
/// * consensus_message_processed - indicate that this certificate was processed by consensus
/// * last_consensus_index - records last processed position in consensus stream
/// * consensus_message_order - records at what position this transaction was first seen in consensus
/// Self::consensus_message_processed returns true after this call for given certificate
fn finish_consensus_transaction_process_with_batch(
&self,
Expand Down Expand Up @@ -1346,13 +1295,6 @@ impl AuthorityPerEpochStore {
certificate: &VerifiedExecutableTransaction,
consensus_index: ExecutionIndicesWithHash,
) -> SuiResult {
if !self.in_memory_checkpoint_roots {
let transaction_digest = *certificate.digest();
batch.insert_batch(
&self.tables.consensus_message_order,
[(consensus_index.index, transaction_digest)],
)?;
}
batch.insert_batch(
&self.tables.pending_execution,
[(*certificate.digest(), certificate.clone().serializable())],
Expand All @@ -1377,56 +1319,6 @@ impl AuthorityPerEpochStore {
.get(&FINAL_EPOCH_CHECKPOINT_INDEX)?)
}

/// Returns transaction digests from consensus_message_order table in the "checkpoint range".
///
/// Checkpoint range is defined from the last seen checkpoint(excluded) to the provided
/// to_height (included)
pub fn last_checkpoint(
&self,
to_height_included: u64,
) -> SuiResult<Option<(u64, Vec<TransactionDigest>)>> {
let (index, from_height_excluded) = self.get_last_checkpoint_boundary();

if let Some(from_height_excluded) = from_height_excluded {
if from_height_excluded >= to_height_included {
// Due to crash recovery we might enter this function twice for same boundary
debug!("Not returning last checkpoint - already processed");
return Ok(None);
}
}

let roots =
self.get_transactions_in_checkpoint_range(from_height_excluded, to_height_included)?;

debug!(
"Selected {} roots between narwhal commit rounds {:?} and {}",
roots.len(),
from_height_excluded,
to_height_included
);

Ok(Some((index, roots)))
}
pub fn record_checkpoint_boundary(&self, commit_round: u64) -> SuiResult {
let (index, height) = self.get_last_checkpoint_boundary();

if let Some(height) = height {
if height >= commit_round {
// Due to crash recovery we might see same boundary twice
debug!("Not recording checkpoint boundary - already updated");
return Ok(());
}
}

let index = index + 1;
debug!(
"Recording checkpoint boundary {} at {}",
index, commit_round
);
self.insert_checkpoint_boundary(index, commit_round)?;
Ok(())
}

pub fn get_reconfig_state_read_lock_guard(
&self,
) -> parking_lot::RwLockReadGuard<ReconfigState> {
Expand Down Expand Up @@ -1726,50 +1618,6 @@ impl AuthorityPerEpochStore {
}
}

pub fn handle_commit_boundary<C: CheckpointServiceNotify>(
&self,
round: Round,
timestamp_ms: CheckpointTimestamp,
checkpoint_service: &Arc<C>,
) -> SuiResult {
debug!("Commit boundary at {}", round);
// This exchange is restart safe because of following:
//
// We try to read last checkpoint content and send it to the checkpoint service
// CheckpointService::notify_checkpoint is idempotent in case you send same last checkpoint multiple times
//
// Only after CheckpointService::notify_checkpoint stores checkpoint in it's store we update checkpoint boundary
if let Some((index, roots)) = self.last_checkpoint(round)? {
let final_checkpoint_round = self.final_epoch_checkpoint()?;
let final_checkpoint = match final_checkpoint_round.map(|r| r.cmp(&round)) {
Some(CmpOrdering::Less) => {
debug!(
"Not forming checkpoint for round {} above final checkpoint round {:?}",
round, final_checkpoint_round
);
return Ok(());
}
Some(CmpOrdering::Equal) => true,
Some(CmpOrdering::Greater) => false,
None => false,
};
let checkpoint = PendingCheckpoint {
roots,
details: PendingCheckpointInfo {
timestamp_ms,
last_of_epoch: final_checkpoint,
commit_height: index,
},
};
checkpoint_service.notify_checkpoint(self, checkpoint)?;
if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.record_end_of_message_quorum_time_metric();
}
}
self.record_checkpoint_boundary(round)
}

pub fn get_pending_checkpoints(
&self,
last: Option<CheckpointCommitHeight>,
Expand Down
66 changes: 30 additions & 36 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,43 +309,37 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {
self.transaction_scheduler
.schedule(transactions_to_schedule)
.await;
if self.epoch_store.in_memory_checkpoint_roots {
// The last block in this function notifies about new checkpoint if needed
let final_checkpoint_round = self
.epoch_store
.final_epoch_checkpoint()
.expect("final_epoch_checkpoint failed");
let final_checkpoint = match final_checkpoint_round.map(|r| r.cmp(&round)) {
Some(Ordering::Less) => {
debug!(
"Not forming checkpoint for round {} above final checkpoint round {:?}",
round, final_checkpoint_round
);
return;
}
Some(Ordering::Equal) => true,
Some(Ordering::Greater) => false,
None => false,
};
let checkpoint = PendingCheckpoint {
roots: roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: timestamp,
last_of_epoch: final_checkpoint,
commit_height: round,
},
};
self.checkpoint_service
.notify_checkpoint(&self.epoch_store, checkpoint)
.expect("notify_checkpoint has failed");
if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.epoch_store.record_end_of_message_quorum_time_metric();
// The last block in this function notifies about new checkpoint if needed
let final_checkpoint_round = self
.epoch_store
.final_epoch_checkpoint()
.expect("final_epoch_checkpoint failed");
let final_checkpoint = match final_checkpoint_round.map(|r| r.cmp(&round)) {
Some(Ordering::Less) => {
debug!(
"Not forming checkpoint for round {} above final checkpoint round {:?}",
round, final_checkpoint_round
);
return;
}
} else {
self.epoch_store
.handle_commit_boundary(round, timestamp, &self.checkpoint_service)
.expect("Unrecoverable error in consensus handler when processing commit boundary")
Some(Ordering::Equal) => true,
Some(Ordering::Greater) => false,
None => false,
};
let checkpoint = PendingCheckpoint {
roots: roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: timestamp,
last_of_epoch: final_checkpoint,
commit_height: round,
},
};
self.checkpoint_service
.notify_checkpoint(&self.epoch_store, checkpoint)
.expect("notify_checkpoint has failed");
if final_checkpoint {
info!(epoch=?self.epoch(), "Received 2f+1 EndOfPublish messages, notifying last checkpoint");
self.epoch_store.record_end_of_message_quorum_time_metric();
}
}

Expand Down

0 comments on commit fb59203

Please sign in to comment.