Skip to content

Commit

Permalink
[checkpoint] Adjust the active process sequence (MystenLabs#2756)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 29, 2022
1 parent b2dd701 commit 8010d67
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 55 deletions.
80 changes: 43 additions & 37 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,30 @@ pub async fn checkpoint_process<A>(

// sync_to_checkpoint only syncs to the checkpoint before the latest checkpoint.
// The latest checkpoint requires special handling (refer to the comments there).
if let Err(err) = update_latest_checkpoint(
let result = update_latest_checkpoint(
active_authority.state.name,
&net,
&state_checkpoints,
&checkpoint,
committee,
)
.await
{
warn!("Failed to update latest checkpoint: {:?}", err);
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
.await;
match result {
Err(err) => {
warn!("Failed to update latest checkpoint: {:?}", err);
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
Ok(true) => {
let name = state_checkpoints.lock().name;
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
debug!("{name:?} at checkpoint {next_checkpoint:?}");
tokio::time::sleep(timing.long_pause_between_checkpoints).await;
continue;
}
Ok(false) => {
// Nothing new.
}
}
}

Expand Down Expand Up @@ -220,13 +232,6 @@ pub async fn checkpoint_process<A>(
}
Ok(true) => (),
}

// (4) Wait for a long long time.
let name = state_checkpoints.lock().name;
let next_checkpoint = state_checkpoints.lock().next_checkpoint();

debug!("{name:?} at checkpoint {next_checkpoint:?}");
tokio::time::sleep(timing.long_pause_between_checkpoints).await;
}
}

Expand Down Expand Up @@ -419,35 +424,36 @@ async fn update_latest_checkpoint<A>(
state_checkpoints: &Arc<Mutex<CheckpointStore>>,
checkpoint: &CertifiedCheckpointSummary,
committee: &Committee,
) -> SuiResult
) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let result = {
state_checkpoints
.lock()
.process_checkpoint_certificate(checkpoint, &None, committee)
}; // unlock

if let Err(err) = result {
warn!("Cannot process checkpoint: {err:?}");
drop(err);

// One of the errors may be due to the fact that we do not have
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) = get_checkpoint_contents(self_name, net.clone(), checkpoint).await {
// Retry with contents
state_checkpoints.lock().process_checkpoint_certificate(
checkpoint,
&Some(contents),
committee,
)?;
let result = state_checkpoints
.lock()
.process_checkpoint_certificate(checkpoint, &None, committee);

match result {
Err(err) => {
warn!("Cannot process checkpoint: {err:?}");

// One of the errors may be due to the fact that we do not have
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) = get_checkpoint_contents(self_name, net.clone(), checkpoint).await
{
// Retry with contents
state_checkpoints.lock().process_checkpoint_certificate(
checkpoint,
&Some(contents),
committee,
)
} else {
Err(err)
}
}
Ok(b) => Ok(b),
}

Ok(())
}

/// Download all checkpoints that are not known to us
Expand Down
17 changes: 8 additions & 9 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,18 +754,20 @@ impl CheckpointStore {
/// A cert without contents is only stored if we have already processed
/// internally the checkpoint. A cert with contents is processed as if
/// it came from the internal consensus.
///
/// Returns whether a new cert is stored locally.
pub fn process_checkpoint_certificate(
&mut self,
checkpoint: &CertifiedCheckpointSummary,
contents: &Option<CheckpointContents>,
committee: &Committee,
) -> Result<CheckpointResponse, SuiError> {
) -> Result<bool, SuiError> {
// Get the record in our checkpoint database for this sequence number.
let current = self.checkpoints.get(checkpoint.summary.sequence_number())?;

match &current {
// If cert exists, do nothing (idempotent)
Some(AuthenticatedCheckpoint::Certified(_current_cert)) => {}
Some(AuthenticatedCheckpoint::Certified(_current_cert)) => Ok(false),
// If no such checkpoint is known, then return an error
// NOTE: a checkpoint must first be confirmed internally before an external
// certificate is registered.
Expand All @@ -777,8 +779,9 @@ impl CheckpointStore {
&AuthenticatedCheckpoint::Certified(checkpoint.clone()),
contents,
)?;
Ok(true)
} else {
return Err(SuiError::from("No checkpoint set at this sequence."));
Err(SuiError::from("No checkpoint set at this sequence."))
}
}
// In this case we have an internal signed checkpoint so we promote it to a
Expand All @@ -789,18 +792,14 @@ impl CheckpointStore {
checkpoint.summary.sequence_number(),
&AuthenticatedCheckpoint::Certified(checkpoint.clone()),
)?;
Ok(true)
}
Some(AuthenticatedCheckpoint::None) => {
// If we are here there was a bug? We never assign the None case
// to a stored value.
unreachable!();
}
};

Ok(CheckpointResponse {
info: AuthorityCheckpointInfo::Success,
detail: None,
})
}
}

// Helper read functions
Expand Down
12 changes: 3 additions & 9 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ fn set_get_checkpoint() {
let response_ckp = cps1
.process_checkpoint_certificate(&checkpoint_cert, &None, &committee)
.unwrap();
assert!(matches!(
response_ckp.info,
AuthorityCheckpointInfo::Success
));
assert!(response_ckp);

// Now we have a certified checkpoint
let response = cps1.handle_past_checkpoint(true, 0).unwrap();
Expand Down Expand Up @@ -636,10 +633,7 @@ fn set_get_checkpoint() {
let response_ckp = cps4
.process_checkpoint_certificate(&checkpoint_cert, &Some(transactions), &committee)
.unwrap();
assert!(matches!(
response_ckp.info,
AuthorityCheckpointInfo::Success
));
assert!(response_ckp);

// Now we have a certified checkpoint
let response = cps4.handle_past_checkpoint(true, 0).unwrap();
Expand Down Expand Up @@ -1722,7 +1716,7 @@ async fn checkpoint_messaging_flow() {
)
.unwrap();

assert!(matches!(response.info, AuthorityCheckpointInfo::Success));
assert!(response);
}
}

Expand Down

0 comments on commit 8010d67

Please sign in to comment.