Skip to content

Commit

Permalink
[core] Set the no_more_fragments correctly (MystenLabs#2715)
Browse files Browse the repository at this point in the history
* Set the `no_more_fragments` correctly
* Added test

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jun 29, 2022
1 parent 71b914d commit 5c85bef
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ async fn pending_exec_storage_notify() {
.collect(),
)
.expect("Storage is ok");

tokio::task::yield_now().await;

// Wait for a notification (must arrive)
authority_state.database.wait_for_new_pending().await;
// get back the certificates
Expand Down
139 changes: 71 additions & 68 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,12 @@ impl CheckpointStore {

// We have a proposal so lets try to re-construct the checkpoint.
let next_sequence_number = self.next_checkpoint();
let locals = self.get_locals();

// Ok to unwrap because of the check above
let our_proposal = locals.current_proposal.as_ref().unwrap();

if let Ok(Some(contents)) = self.reconstruct_contents(committee) {
if let Ok(Some(contents)) = self.reconstruct_contents(committee, our_proposal) {
// Here we check, and ensure, all transactions are processed before we
// move to sign the checkpoint.
if !self
Expand Down Expand Up @@ -650,6 +654,7 @@ impl CheckpointStore {
pub fn reconstruct_contents(
&mut self,
committee: &Committee,
our_proposal: &CheckpointProposal,
) -> Result<Option<CheckpointContents>, FragmentInternalError> {
let next_sequence_number = self.next_checkpoint();
let fragments: Vec<_> = self
Expand All @@ -667,81 +672,79 @@ impl CheckpointStore {
.map_err(FragmentInternalError::Error)?;

if let Some(reconstructed) = _potential_checkpoint {
if let Some(proposal) = &self.get_locals().current_proposal {
// By definition the proposal and the new checkpoint must be in the
// same sequence number of checkpoint.
// A little argument about how the fragment -> checkpoint process is live
//
// A global checkpoint candidate must contain at least 2f+1 stake. And as
// a result of this f+1 stake will be from honest nodes that by definition
// must have submitted a proposal (because it is included!).
// So f+1 honest authorities will be able to reconstruct and sign the
// checkpoint. And all other authorities by asking all authorities will be
// able to get f+1 signatures and construct a checkpoint certificate.

// By definition the proposal and the new checkpoint must be in the
// same sequence number of checkpoint.

// Strategy 1 to reconstruct checkpoint -- we are included in it!

if reconstructed
.global
.authority_waypoints
.contains_key(&self.name)
{
// We are included in the proposal, so we can go ahead and construct the
// full checkpoint!
let mut contents = our_proposal.transactions.clone();
contents.transactions.extend(
// Add all items missing to reach then global waypoint
reconstructed.global.authority_waypoints[&self.name]
.items
.clone(),
);

return Ok(Some(contents));
}

// Strategy 1 to reconstruct checkpoint -- we are included in it!
// Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set

let local_links: HashSet<_> = self.local_fragments.keys().collect();
let checkpoint_keys: HashSet<_> = reconstructed
.global
.authority_waypoints
.keys()
.cloned()
.collect();

if let Some(auth) = local_links.intersection(&checkpoint_keys).next() {
let fragment = self
.local_fragments
.get(auth)
.map_err(|err| FragmentInternalError::Error(err.into()))?
.unwrap();

// Extract the diff
let diff = if fragment.proposer.authority() == &self.name {
fragment.diff
} else {
fragment.diff.swap()
};

if reconstructed
if let Ok(contents) = reconstructed
.global
.authority_waypoints
.contains_key(&self.name)
.checkpoint_items(&diff, our_proposal.transactions.transactions.clone())
{
// We are included in the proposal, so we can go ahead and construct the
// full checkpoint!
let mut contents = proposal.transactions.clone();
contents.transactions.extend(
// Add all items missing to reach then global waypoint
reconstructed.global.authority_waypoints[&self.name]
.items
.clone(),
);

let contents = CheckpointContents::new(contents.into_iter());
return Ok(Some(contents));
}

// Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set

let local_links: HashSet<_> = self.local_fragments.keys().collect();
let checkpoint_keys: HashSet<_> = reconstructed
.global
.authority_waypoints
.keys()
.cloned()
.collect();

if let Some(auth) = local_links.intersection(&checkpoint_keys).next() {
let fragment = self
.local_fragments
.get(auth)
.map_err(|err| FragmentInternalError::Error(err.into()))?
.unwrap();

// Extract the diff
let diff = if fragment.proposer.authority() == &self.name {
fragment.diff
} else {
fragment.diff.swap()
};

if let Ok(contents) = reconstructed
.global
.checkpoint_items(&diff, proposal.transactions.transactions.clone())
{
let contents = CheckpointContents::new(contents.into_iter());
return Ok(Some(contents));
}
}
} else {
// A little argument about how the fragment -> checkpoint process is live
//
// A global checkpoint candidate must contain at least 2f+1 stake. And as
// a result of this f+1 stake will be from honest nodes that by definition
// must have submitted a proposal (because it is included!).
// So f+1 honest authorities will be able to reconstruct and sign the
// checkpoint. And all other authorities by asking all authorities will be
// able to get f+1 signatures and construct a checkpoint certificate.

// Sets the reconstruction to false, we have all fragments we need, but
// just cannot reconstruct the contents.
let locals = self.get_locals();
let mut new_locals = locals.as_ref().clone();
new_locals.no_more_fragments = true;
self.set_locals(locals, new_locals)
.map_err(FragmentInternalError::Error)?;
}

// Sets the reconstruction to false, we have all fragments we need, but
// just cannot reconstruct the contents.
let locals = self.get_locals();
let mut new_locals = locals.as_ref().clone();
new_locals.no_more_fragments = true;
self.set_locals(locals, new_locals)
.map_err(FragmentInternalError::Error)?;

return Err(FragmentInternalError::Error(SuiError::from(
"Missing info to construct known checkpoint.",
)));
Expand Down
121 changes: 121 additions & 0 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1727,3 +1727,124 @@ async fn checkpoint_messaging_flow() {
assert!(matches!(response.info, AuthorityCheckpointInfo::Success));
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_no_more_fragments() {
let mut setup = checkpoint_tests_setup(5, Duration::from_millis(500), true).await;

// Check that the system is running.
let t = setup.transactions.pop().unwrap();
let (_cert, effects) = setup
.aggregator
.execute_transaction(&t)
.await
.expect("All ok.");

// Check whether this is a success?
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));

// Wait for a batch to go through
// (We do not really wait, we jump there since real-time is not running).
tokio::time::sleep(Duration::from_secs(5)).await;

// Happy path checkpoint flow

// Step 1 -- get a bunch of proposals
let mut proposals = Vec::new();
// First make sure each authority creates a proposal.
for auth in &setup.authorities {
let proposal = auth
.checkpoint
.lock()
.new_proposal(setup.committee.epoch)
.unwrap();
proposals.push(proposal);
}

let p3 = proposals.pop().unwrap();
let p2 = proposals.pop().unwrap();
let p1 = proposals.pop().unwrap();
let p0 = proposals.pop().unwrap();

let f01 = p0.fragment_with(&p1);
let f02 = p0.fragment_with(&p2);
let f03 = p0.fragment_with(&p3);

// put in fragment 0-1 and no checkpoint can be formed

setup.authorities[0]
.checkpoint
.lock()
.submit_local_fragment_to_consensus(&f01, &setup.committee)
.unwrap();

// Give time to the receiving task to process (so that consensus can sequence fragments).
tokio::time::sleep(Duration::from_secs(1)).await;

// Expecting more fragments
assert!(
!setup.authorities[0]
.checkpoint
.lock()
.get_locals()
.no_more_fragments
);

// put in fragment 0-2, now node 0 can form a checkpoint but not node 3

setup.authorities[0]
.checkpoint
.lock()
.submit_local_fragment_to_consensus(&f02, &setup.committee)
.unwrap();

// Give time to the receiving task to process (so that consensus can sequence fragments).
tokio::time::sleep(Duration::from_secs(1)).await;

assert!(setup.authorities[0]
.checkpoint
.lock()
.attempt_to_construct_checkpoint(&setup.committee)
.unwrap());

// Expecting more fragments
assert!(
!setup.authorities[0]
.checkpoint
.lock()
.get_locals()
.no_more_fragments
);

// node 3 cannot make one
assert!(!setup.authorities[3]
.checkpoint
.lock()
.attempt_to_construct_checkpoint(&setup.committee)
.unwrap());

// Expecting more fragments
assert!(
setup.authorities[3]
.checkpoint
.lock()
.get_locals()
.no_more_fragments
);

// Now fie node 3 a link and it can make the checkpoint
setup.authorities[3]
.checkpoint
.lock()
.submit_local_fragment_to_consensus(&f03, &setup.committee)
.unwrap();

assert!(setup.authorities[3]
.checkpoint
.lock()
.attempt_to_construct_checkpoint(&setup.committee)
.unwrap());
}

0 comments on commit 5c85bef

Please sign in to comment.