Skip to content

Commit

Permalink
[checkpoint] Only clear proposal after we have stored a cert (MystenL…
Browse files Browse the repository at this point in the history
…abs#3425)

* [checkpoint] Only clear proposal after we have stored a cert

* fix tests
  • Loading branch information
lxfind authored Jul 25, 2022
1 parent 32c1962 commit 9ae4d1f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 75 deletions.
37 changes: 17 additions & 20 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,13 @@ pub async fn checkpoint_process<A>(

// (3) Create a new proposal locally. This will also allow other validators
// to query the proposal.
// Only move to propose when we have the full checkpoint certificate
let sequence_number = state_checkpoints.lock().next_checkpoint();
if sequence_number > 0 {
// Check that we have the full certificate for the previous checkpoint.
// If not, we are not ready yet to make a proposal.
if !matches!(
state_checkpoints.lock().get_checkpoint(sequence_number - 1),
Ok(Some(AuthenticatedCheckpoint::Certified(..)))
) {
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}
// If we have already signed a new checkpoint locally, there is nothing to do.
if matches!(
state_checkpoints.lock().latest_stored_checkpoint(),
Some(AuthenticatedCheckpoint::Signed(..))
) {
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}

let proposal = state_checkpoints.lock().set_proposal(committee.epoch);
Expand All @@ -299,6 +294,10 @@ pub async fn checkpoint_process<A>(
.sum();

if weight < committee.quorum_threshold() {
debug!(
?weight,
"Not enough proposals to make progress on checkpoint creation"
);
// We don't have a quorum of proposals yet, we won't succeed making a checkpoint
// even if we try. Sleep and come back latter.
tokio::time::sleep(timing.delay_on_local_failure).await;
Expand Down Expand Up @@ -529,7 +528,7 @@ async fn update_latest_checkpoint<A>(
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let latest_local_checkpoint = state_checkpoints.lock().latest_stored_checkpoint()?;
let latest_local_checkpoint = state_checkpoints.lock().latest_stored_checkpoint();
enum Action {
Promote,
NewCert,
Expand Down Expand Up @@ -617,7 +616,7 @@ where
let net = active_authority.net.load();
let state = active_authority.state.clone();
// Get out last checkpoint
let latest_checkpoint = checkpoint_db.lock().latest_stored_checkpoint()?;
let latest_checkpoint = checkpoint_db.lock().latest_stored_checkpoint();
// We use the latest available authorities not the authorities that signed the checkpoint
// since these might be gone after the epoch they were active.
let available_authorities: BTreeSet<_> = latest_known_checkpoint
Expand Down Expand Up @@ -790,16 +789,14 @@ where
match augment_fragment_with_diff_transactions(active_authority, fragment).await {
Ok(fragment) => {
// On success send the fragment to consensus
let proposer = fragment.proposer.authority();
let other = fragment.other.authority();
debug!("Send fragment: {proposer:?} -- {other:?}");
let _ = checkpoint_db.lock().submit_local_fragment_to_consensus(
if let Err(err) = checkpoint_db.lock().submit_local_fragment_to_consensus(
&fragment,
&active_authority.state.committee.load(),
);
) {
warn!("Error submitting local fragment to consensus: {err:?}");
}
}
Err(err) => {
// TODO: some error occurred -- log it.
warn!("Error augmenting the fragment: {err:?}");
}
}
Expand Down
51 changes: 33 additions & 18 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ impl CheckpointStore {
if !locals.no_more_fragments {
// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
debug!("Send fragment: {} -- {}", self.name, other_name);
sender.send_to_consensus(fragment.clone())?;
} else {
return Err(SuiError::from("No consensus sender configured"));
Expand All @@ -539,6 +540,14 @@ impl CheckpointStore {
committee: &Committee,
handle_pending_cert: impl PendCertificateForExecution,
) -> Result<(), FragmentInternalError> {
let fragment_seq = fragment.proposer.summary.sequence_number;
debug!(
execution_index=?seq,
cp_seq=?fragment_seq,
"Fragment received from consensus. Proposal: {}, other: {}",
fragment.proposer.authority(),
fragment.other.authority(),
);
// Ensure we have not already processed this fragment.
if let Some((last_seq, _)) = self.fragments.iter().skip_to_last().next() {
if seq <= last_seq {
Expand All @@ -553,6 +562,7 @@ impl CheckpointStore {
.map_err(FragmentInternalError::Error)?;

// Schedule for execution all the certificates that are included here.
// TODO: We should not schedule a cert if it has already been executed.
handle_pending_cert
.add_pending_certificates(
fragment
Expand Down Expand Up @@ -717,6 +727,7 @@ impl CheckpointStore {
let locals = self.get_locals();
let mut new_locals = locals.as_ref().clone();
new_locals.no_more_fragments = true;
debug!("no_more_fragments is set");
self.set_locals(locals, new_locals)?;

Err(SuiError::from(
Expand All @@ -732,13 +743,14 @@ impl CheckpointStore {
) -> SuiResult {
checkpoint.verify(committee, None)?;
debug_assert!(matches!(
self.latest_stored_checkpoint()?,
self.latest_stored_checkpoint(),
Some(AuthenticatedCheckpoint::Signed(_))
));
let seq = checkpoint.summary.sequence_number();
self.checkpoints
.insert(seq, &AuthenticatedCheckpoint::Certified(checkpoint.clone()))?;
metrics.checkpoint_sequence_number.set(*seq as i64);
self.clear_proposal(*seq + 1)?;
Ok(())
}

Expand All @@ -764,9 +776,24 @@ impl CheckpointStore {
effects_store,
)?;
metrics.checkpoint_sequence_number.set(*seq as i64);
self.clear_proposal(*seq + 1)?;
Ok(())
}

fn clear_proposal(
&mut self,
new_expected_next_checkpoint: CheckpointSequenceNumber,
) -> SuiResult {
let locals = self.get_locals();

let mut new_locals = locals.as_ref().clone();
new_locals.current_proposal = None;
new_locals.proposal_next_transaction = None;
new_locals.no_more_fragments = false;
new_locals.next_checkpoint = new_expected_next_checkpoint;
self.set_locals(locals, new_locals)
}

// Helper read functions

/// Return the seq number of the next checkpoint.
Expand All @@ -780,15 +807,12 @@ impl CheckpointStore {
}

/// Get the latest stored checkpoint if there is one
pub fn latest_stored_checkpoint(
&mut self,
) -> Result<Option<AuthenticatedCheckpoint>, SuiError> {
Ok(self
.checkpoints
pub fn latest_stored_checkpoint(&mut self) -> Option<AuthenticatedCheckpoint> {
self.checkpoints
.iter()
.skip_to_last()
.next()
.map(|(_, ckp)| ckp))
.map(|(_, ckp)| ckp)
}

pub fn is_ready_to_start_epoch_change(&mut self) -> bool {
Expand Down Expand Up @@ -825,6 +849,7 @@ impl CheckpointStore {
};

let transactions = CheckpointContents::new(self.extra_transactions.keys());
let size = transactions.transactions.len();
let checkpoint_proposal = CheckpointProposal::new(
epoch,
checkpoint_sequence,
Expand All @@ -839,7 +864,7 @@ impl CheckpointStore {
new_locals.proposal_next_transaction = Some(next_local_tx_sequence);
self.set_locals(locals, new_locals)?;

info!(cp_seq=?checkpoint_sequence, "A new checkpoint proposal is created");
info!(cp_seq=?checkpoint_sequence, ?size, "A new checkpoint proposal is created");
Ok(checkpoint_proposal)
}

Expand Down Expand Up @@ -947,16 +972,6 @@ impl CheckpointStore {
// Write to the database.
batch.write()?;

// Clean up our proposal if any
let locals = self.get_locals();

let mut new_locals = locals.as_ref().clone();
new_locals.current_proposal = None;
new_locals.proposal_next_transaction = None;
new_locals.no_more_fragments = false;
new_locals.next_checkpoint = expected_seq + 1;
self.set_locals(locals, new_locals)?;

Ok(())
}

Expand Down
82 changes: 47 additions & 35 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ fn make_checkpoint_db() {
.unwrap();
assert_eq!(cps.checkpoint_contents.iter().count(), 1);
assert_eq!(cps.extra_transactions.iter().count(), 1);
assert_eq!(cps.next_checkpoint(), 1);

cps.update_processed_transactions(&[(6, t6)]).unwrap();
assert_eq!(cps.checkpoint_contents.iter().count(), 1);
Expand Down Expand Up @@ -523,49 +522,43 @@ fn latest_proposal() {

// --- TEST3 ---

// No valid checkpoint proposal condition...
assert!(cps1.get_locals().current_proposal.is_none());
// Proposals are not cleared until we have a cert.
assert!(cps1.get_locals().current_proposal.is_some());

let signed: Vec<_> = [
cps1.latest_stored_checkpoint().unwrap(),
cps2.latest_stored_checkpoint().unwrap(),
]
.into_iter()
.map(|a| match a {
AuthenticatedCheckpoint::Signed(s) => s,
_ => panic!("Unexpected type"),
})
.collect();
// We only need f+1 to make a cert. 2 is sufficient.
let cert = CertifiedCheckpointSummary::aggregate(signed, &committee).unwrap();
cps1.promote_signed_checkpoint_to_cert(&cert, &committee, &CheckpointMetrics::new_for_tests())
.unwrap();

let request = CheckpointRequest::latest(false);
let response = cps1.handle_latest_proposal(&request).expect("no errors");
assert!(response.detail.is_none());
// The proposal should have been cleared now.
assert!(matches!(
response.info,
AuthorityCheckpointInfo::Proposal { .. }
));
if let AuthorityCheckpointInfo::Proposal { current, previous } = response.info {
assert!(current.is_none());
assert!(matches!(
previous,
Some(AuthenticatedCheckpoint::Signed { .. })
));
}

// --- TEST 4 ---

// When details are needed, then return unexecuted transactions if there is no proposal
let request = CheckpointRequest::latest(true);
let response = cps1.handle_latest_proposal(&request).expect("no errors");
assert!(response.detail.is_none());

assert!(matches!(
response.info,
AuthorityCheckpointInfo::Proposal { .. }
AuthorityCheckpointInfo::Proposal {
current: None,
previous: Some(AuthenticatedCheckpoint::Certified { .. })
}
));
if let AuthorityCheckpointInfo::Proposal { current, previous } = response.info {
assert!(current.is_none());
assert!(matches!(
previous,
Some(AuthenticatedCheckpoint::Signed { .. })
));
}

// ---
cps1.update_processed_transactions(&[(6, t6)]).unwrap();

// Create a new proposal.
let _p1 = cps1.set_proposal(committee.epoch).unwrap();

// --- TEST 5 ---
// --- TEST 4 ---

// Get the full proposal with previous proposal
let request = CheckpointRequest::latest(true);
Expand All @@ -578,7 +571,7 @@ fn latest_proposal() {
assert!(current.is_some());
assert!(matches!(
previous,
Some(AuthenticatedCheckpoint::Signed { .. })
Some(AuthenticatedCheckpoint::Certified { .. })
));

let current_proposal = current.unwrap();
Expand Down Expand Up @@ -823,6 +816,25 @@ fn checkpoint_integration() {
TestCausalOrderPendCertNoop,
)
.is_ok());
// Turn the signed checkpoint to a cert. This is required to make progress.
let checkpoint = match cps.latest_stored_checkpoint().unwrap() {
AuthenticatedCheckpoint::Signed(s) => s.summary,
_ => unreachable!(),
};
let signatures: Vec<_> = keys
.iter()
.map(|key| {
let name = key.public().into();
SignedCheckpointSummary::new_from_summary(checkpoint.clone(), name, key)
})
.collect();
let cert = CertifiedCheckpointSummary::aggregate(signatures, &committee).unwrap();
cps.promote_signed_checkpoint_to_cert(
&cert,
&committee,
&CheckpointMetrics::new_for_tests(),
)
.unwrap();

// Loop invariant to ensure termination or error
assert_eq!(cps.get_locals().next_checkpoint, old_checkpoint + 1);
Expand Down Expand Up @@ -1424,8 +1436,8 @@ fn test_fragment_full_flow() {

// Two fragments for 5-6, and then 0-1, 1-2, 2-3, 3-4
assert_eq!(seq.next_transaction_index, 6);
// Advanced to next checkpoint
assert_eq!(cps0.next_checkpoint(), 1);
// We don't update next checkpoint yet until we get a cert.
assert_eq!(cps0.next_checkpoint(), 0);

let response = cps0
.handle_past_checkpoint(true, 0)
Expand Down Expand Up @@ -1453,7 +1465,7 @@ fn test_fragment_full_flow() {
// Two fragments for 5-6, and then 0-1, 1-2, 2-3, 3-4
assert_eq!(cps6.fragments.iter().count(), 6);
// Cannot advance to next checkpoint
assert_eq!(cps6.next_checkpoint(), 0);
assert!(cps6.latest_stored_checkpoint().is_none());
// But recording of fragments is closed

// However recording has stopped
Expand Down
4 changes: 2 additions & 2 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ async fn wait_for_advance_to_next_checkpoint(

match ok {
true => break,
false => tokio::time::sleep(Duration::from_secs(1)).await,
false => sleep(Duration::from_secs(1)).await,
}
cnt += 1;
assert!(cnt <= 40);
assert!(cnt <= 20);
}

// Ensure all authorities moved to the next checkpoint sequence number.
Expand Down

0 comments on commit 9ae4d1f

Please sign in to comment.