Skip to content

Commit

Permalink
[checkpoint] Do not make progress in passive code (MystenLabs#2710)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 27, 2022
1 parent ed3d6ad commit 12837a9
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 165 deletions.
13 changes: 2 additions & 11 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,16 +971,11 @@ impl AuthorityState {
.lock();
match &request.request_type {
CheckpointRequestType::LatestCheckpointProposal => {
checkpoint_store.handle_latest_proposal(self.committee.load().epoch, request)
checkpoint_store.handle_latest_proposal(request)
}
CheckpointRequestType::PastCheckpoint(seq) => {
checkpoint_store.handle_past_checkpoint(request.detail, *seq)
}
CheckpointRequestType::SetCertificate(cert, opt_contents) => checkpoint_store
.handle_checkpoint_certificate(cert, opt_contents, &self.committee.load()),
CheckpointRequestType::SetFragment(fragment) => {
checkpoint_store.handle_receive_fragment(fragment, &self.committee.load())
}
}
}

Expand Down Expand Up @@ -1105,11 +1100,7 @@ impl AuthorityState {
// Update the checkpointing mechanism
checkpoint
.lock()
.handle_internal_batch(
batch.batch.next_sequence_number,
&transactions,
&state.committee.load(),
)
.handle_internal_batch(batch.batch.next_sequence_number, &transactions)
.expect("Should see no errors updating the checkpointing mechanism.");
}
}
Expand Down
17 changes: 12 additions & 5 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub async fn checkpoint_process<A>(
// In either case try to upgrade the signed checkpoint to a certified one
// if possible
let result = {
state_checkpoints.lock().handle_checkpoint_certificate(
state_checkpoints.lock().process_checkpoint_certificate(
&checkpoint,
&None,
committee,
Expand All @@ -167,7 +167,7 @@ pub async fn checkpoint_process<A>(
.await
{
// Retry with contents
let _ = state_checkpoints.lock().handle_checkpoint_certificate(
let _ = state_checkpoints.lock().process_checkpoint_certificate(
&checkpoint,
&Some(contents),
committee,
Expand Down Expand Up @@ -201,6 +201,13 @@ pub async fn checkpoint_process<A>(
.await;
}

if let Err(err) = state_checkpoints
.lock()
.attempt_to_construct_checkpoint(committee)
{
warn!("Error attempting to construct checkpoint: {:?}", err);
}

// (5) Wait for a long long time.
let name = state_checkpoints.lock().name;
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
Expand Down Expand Up @@ -419,7 +426,7 @@ where
if let Err(err) =
checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &None, &net.committee)
.process_checkpoint_certificate(&past, &None, &net.committee)
{
warn!("Error handling certificate: {err:?}");
}
Expand All @@ -441,7 +448,7 @@ where
if let Err(err) =
checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &_contents, &net.committee)
.process_checkpoint_certificate(&past, &_contents, &net.committee)
{
warn!("Sync Err: {err:?}");
}
Expand Down Expand Up @@ -636,7 +643,7 @@ pub async fn diff_proposals<A>(
let proposer = fragment.proposer.authority();
let other = fragment.other.authority();
debug!("Send fragment: {proposer:?} -- {other:?}");
let _ = checkpoint_db.lock().handle_receive_fragment(
let _ = checkpoint_db.lock().submit_local_fragment_to_consensus(
&fragment,
&active_authority.state.committee.load(),
);
Expand Down
9 changes: 4 additions & 5 deletions crates/sui-core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,10 @@ impl crate::authority::AuthorityState {
// If a checkpointing service is present, register the batch with it
// to insert the transactions into future checkpoint candidates
if let Some(checkpoint) = &self.checkpoints {
if let Err(err) = checkpoint.lock().handle_internal_batch(
new_batch.batch.next_sequence_number,
&current_batch,
&self.committee.load(),
) {
if let Err(err) = checkpoint
.lock()
.handle_internal_batch(new_batch.batch.next_sequence_number, &current_batch)
{
error!("Checkpointing service error: {}", err);
}
}
Expand Down
85 changes: 24 additions & 61 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,8 @@ impl CheckpointStore {

pub fn handle_latest_proposal(
&mut self,
epoch: EpochId,
request: &CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
// Set a proposal if there is not one, and one could be set
// TODO: check some minimum time passed since the last one
// and only set after that time.
let _ = self.new_proposal(epoch);

// Try to load any latest proposal
let locals = self.get_locals();
let latest_checkpoint_proposal = &locals.current_proposal;
Expand Down Expand Up @@ -407,7 +401,7 @@ impl CheckpointStore {
// - check the new checkpoint contents do not include already checkpointed transactions.
// - check that the sequence is complete, ie no missing dependencies in the checkpoint or
// previous checkpoint for all transactions included.
// - create a cannonical causal order that is deterministic accross authorities and store
// - create a canonical causal order that is deterministic across authorities and store
// contents as such a list instead of a set.
// Probably we need access to the effects to do the above.

Expand Down Expand Up @@ -456,7 +450,6 @@ impl CheckpointStore {
&mut self,
next_sequence_number: TxSequenceNumber,
transactions: &[(TxSequenceNumber, ExecutionDigests)],
committee: &Committee,
) -> Result<(), SuiError> {
self.update_processed_transactions(transactions)?;

Expand All @@ -466,21 +459,12 @@ impl CheckpointStore {
new_locals.next_transaction_sequence = next_sequence_number;
self.set_locals(locals, new_locals)?;

// Attempt to move forward, as many times as we can
while self
.attempt_to_construct_checkpoint(committee)
.unwrap_or(false)
{}

Ok(())
}

// TODO: this function should submit the received fragment to the
// consensus algorithm for sequencing. It should also do some
// basic checks to not submit redundant information to the
// consensus, as well as to check it is the right node to
// submit to consensus.
pub fn handle_receive_fragment(
// TODO: this function should do some basic checks to not submit redundant information to the
// consensus, as well as to check it is the right node to submit to consensus.
pub fn submit_local_fragment_to_consensus(
&mut self,
fragment: &CheckpointFragment,
committee: &Committee,
Expand Down Expand Up @@ -522,10 +506,6 @@ impl CheckpointStore {
});
}

// TODO: Checks here that the fragment makes progress over the existing
// construction of components using the self.fragments table. This
// is an optimization for later.

let locals = self.get_locals();
if !locals.no_more_fragments {
// Send to consensus for sequencing.
Expand All @@ -534,18 +514,6 @@ impl CheckpointStore {
} else {
return Err(SuiError::from("No consensus sender configured"));
}
} else {
// Maybe the fragment we received allows us to complete the current checkpoint?
// Since we seem to be missing information to complete it (ie there is a checkpoint
// but we are not included in it.)
loop {
let construct = self.attempt_to_construct_checkpoint(committee);
// Exit if checkpoint construction leads to an error or returns false
// (ie no new checkpoint is created.)
if construct.is_err() || !construct.unwrap() {
break;
}
}
}

// NOTE: we should charge the node that sends this into consensus
Expand All @@ -563,80 +531,75 @@ impl CheckpointStore {
/// fragments should be provided in seq increasing order.
pub fn handle_internal_fragment<P: PendCertificateForExecution>(
&mut self,
_seq: ExecutionIndices,
_fragment: CheckpointFragment,
seq: ExecutionIndices,
fragment: CheckpointFragment,
committee: &Committee,
handle_pending_cert: &P,
) -> Result<(), FragmentInternalError> {
// Ensure we have not already processed this fragment.
if let Some((last_seq, _)) = self.fragments.iter().skip_to_last().next() {
if _seq <= last_seq {
if seq <= last_seq {
// We have already processed this fragment, just exit.
return Ok(());
}
}

// Check structure is correct and signatures verify
_fragment
fragment
.verify(committee)
.map_err(FragmentInternalError::Error)?;

// Schedule for execution all the certificates that are included here.
handle_pending_cert
.pending_execution(
_fragment
fragment
.certs
.iter()
.map(|(digest, cert)| (digest.transaction, cert.clone()))
.collect(),
)
.map_err(|_err| {
// There is a possibility this was not stored!
let fragment = _fragment.clone();
let fragment = fragment.clone();
FragmentInternalError::Retry(Box::new(fragment))
})?;

// Save the new fragment in the DB
self.fragments.insert(&_seq, &_fragment).map_err(|_err| {
self.fragments.insert(&seq, &fragment).map_err(|_err| {
// There is a possibility this was not stored!
let fragment = _fragment.clone();
let fragment = fragment.clone();
FragmentInternalError::Retry(Box::new(fragment))
})?;

// If the fragment contains us also save it in the list of local fragments
let next_sequence_number = self.next_checkpoint();
if *_fragment.proposer.summary.sequence_number() == next_sequence_number {
if _fragment.proposer.authority() == &self.name {
if *fragment.proposer.summary.sequence_number() == next_sequence_number {
if fragment.proposer.authority() == &self.name {
self.local_fragments
.insert(_fragment.other.authority(), &_fragment)
.insert(fragment.other.authority(), &fragment)
.map_err(|_err| {
// There is a possibility this was not stored!
let fragment = _fragment.clone();
let fragment = fragment.clone();
FragmentInternalError::Retry(Box::new(fragment))
})?;
}
if _fragment.other.authority() == &self.name {
if fragment.other.authority() == &self.name {
self.local_fragments
.insert(_fragment.proposer.authority(), &_fragment)
.insert(fragment.proposer.authority(), &fragment)
.map_err(|_err| {
// There is a possibility this was not stored!
let fragment = _fragment.clone();
let fragment = fragment.clone();
FragmentInternalError::Retry(Box::new(fragment))
})?;
}
}

// Attempt to move forward, as many times as we can
while self
.attempt_to_construct_checkpoint(committee)
.unwrap_or(false)
{}
Ok(())
}

/// Attempt to construct the next expected checkpoint, and return true if a new
/// checkpoint is created or false if it is not.
fn attempt_to_construct_checkpoint(
pub fn attempt_to_construct_checkpoint(
&mut self,
committee: &Committee,
) -> Result<bool, FragmentInternalError> {
Expand Down Expand Up @@ -783,14 +746,14 @@ impl CheckpointStore {
Ok(None)
}

/// Handles the submission of a full checkpoint externally, and stores
/// the certificate. It may be used to upload a certificate, or induce
/// the authority to catch up with the latest checkpoints.
/// Processes a checkpoint certificate that this validator just learned about.
/// Such certificate may either be created locally based on a quorum of signed checkpoints,
/// or downloaded from other validators to sync local checkpoint state.
///
/// A cert without contents is only stored if we have already processed
/// internally the checkpoint. A cert with contents is processed as if
/// it came from the internal consensus.
pub fn handle_checkpoint_certificate(
pub fn process_checkpoint_certificate(
&mut self,
checkpoint: &CertifiedCheckpointSummary,
contents: &Option<CheckpointContents>,
Expand Down
Loading

0 comments on commit 12837a9

Please sign in to comment.