Skip to content

Commit

Permalink
[checkpoint] Restructure fragment making progress (MystenLabs#3559)
Browse files Browse the repository at this point in the history
* [checkpoint] Restructure fragment making progress

* Address feedback
  • Loading branch information
lxfind authored Jul 29, 2022
1 parent 3566707 commit 4ff3573
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 111 deletions.
215 changes: 114 additions & 101 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ impl Default for CheckpointProcessControl {
CheckpointProcessControl {
delay_on_quorum_failure: Duration::from_secs(10),
delay_on_local_failure: Duration::from_secs(3),
long_pause_between_checkpoints: Duration::from_secs(60),
long_pause_between_checkpoints: Duration::from_secs(120),
timeout_until_quorum: Duration::from_secs(60),
extra_time_after_quorum: Duration::from_millis(200),
consensus_delay_estimate: Duration::from_secs(1),
// TODO: Optimize this.
// https://github.com/MystenLabs/sui/issues/3619.
consensus_delay_estimate: Duration::from_secs(2),
per_other_authority_delay: Duration::from_secs(30),
epoch_change_retry_delay: Duration::from_millis(100),
}
Expand All @@ -100,7 +102,6 @@ pub struct CheckpointMetrics {
pub checkpoint_sequence_number: IntGauge,
checkpoints_signed: IntCounter,
checkpoint_frequency: Histogram,
checkpoint_num_fragments_sent: Histogram,
}

impl CheckpointMetrics {
Expand All @@ -124,12 +125,6 @@ impl CheckpointMetrics {
registry,
)
.unwrap(),
checkpoint_num_fragments_sent: register_histogram_with_registry!(
"checkpoint_num_fragments_sent",
"Number of fragments sent to consensus before this validator is able to make a checkpoint",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -195,6 +190,10 @@ pub async fn checkpoint_process<A>(
// while we do sync. We are in any case not in a position to make valuable
// proposals.
if let Some(checkpoint) = highest_checkpoint {
debug!(
"Highest Checkpoint Certificate from the network: {}",
checkpoint
);
// Check if there are more historic checkpoints to catch up with
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
// First sync until before the latest checkpoint. We will special
Expand Down Expand Up @@ -294,7 +293,6 @@ pub async fn checkpoint_process<A>(
&my_proposal,
committee,
timing.consensus_delay_estimate,
&metrics,
)
.await
{
Expand Down Expand Up @@ -462,10 +460,6 @@ where
}
});

debug!(
"Highest Checkpoint Certificate from the network: {:?}",
highest_certificate_cert
);
Ok(highest_certificate_cert)
}

Expand Down Expand Up @@ -682,120 +676,139 @@ pub async fn create_fragments_and_make_checkpoint<A>(
my_proposal: &CheckpointProposal,
committee: &Committee,
consensus_delay_estimate: Duration,
metrics: &CheckpointMetrics,
) -> bool
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Pick another authority, get their proposal, and submit it to consensus
// Exit when we have a checkpoint proposal.
let mut available_authorities: BTreeSet<AuthorityName> = committee
.voting_rights
.iter()
.map(|(name, _)| *name)
.collect();
available_authorities.remove(&active_authority.state.name); // remove ourselves
let mut available_authorities = committee.shuffle_by_stake(None, None);
// Remove ourselves and all validators that we have already diffed with.
let already_fragmented = checkpoint_db.lock().validators_already_fragmented_with();
// TODO: We can also use AuthorityHealth to pick healthy authorities first.
available_authorities
.retain(|name| name != &active_authority.state.name && !already_fragmented.contains(name));
debug!(
fragmented_count=?already_fragmented.len(),
to_be_fragmented_count=?available_authorities.len(),
"Going through remaining validators to generate fragments",
);

let next_checkpoint_sequence_number = checkpoint_db.lock().next_checkpoint();
let mut fragments_num = 0;
let mut index = 0;

loop {
// Always try to construct checkpoint first. This gives a chance to construct checkpoint
// even when `available_authorities` is empty already.
let result = checkpoint_db
.lock()
.attempt_to_construct_checkpoint(active_authority.state.database.clone(), committee);

match result {
Err(err) => {
// We likely don't have enough fragments. Fall through to send more fragments.
debug!(
?next_checkpoint_sequence_number,
num_proposals_processed=?index,
"Failed to construct checkpoint: {:?}",
err
);
}
Ok(()) => {
// A new checkpoint has been made.
return true;
}
}

if checkpoint_db.lock().get_locals().no_more_fragments {
// Sending more fragments won't help anymore.
break;
}

for authority in committee.shuffle_by_stake(None, None).iter() {
// We have ran out of authorities?
if available_authorities.is_empty() {
if index == available_authorities.len() {
// We have created as many fragments as possible, so exit.
break;
}
if !available_authorities.remove(authority) {
continue;
}
let authority = available_authorities[index];
index += 1;

// Get a client
let client = active_authority.net.load().authority_clients[authority].clone();
let client = active_authority.net.load().authority_clients[&authority].clone();

if let Ok(response) = client
match client
.handle_checkpoint(CheckpointRequest::proposal(true))
.await
{
if let AuthorityCheckpointInfo::CheckpointProposal {
proposal,
prev_cert,
} = &response.info
{
// Check if there is a latest checkpoint
if let Some(prev) = prev_cert {
if prev.summary.sequence_number >= next_checkpoint_sequence_number {
// We are now behind, stop the process
break;
}
}

// For some reason the proposal is empty?
if proposal.is_none() || response.detail.is_none() {
continue;
}

// Check the proposal is also for the same checkpoint sequence number
if &proposal.as_ref().unwrap().summary.sequence_number
!= my_proposal.sequence_number()
Ok(response) => {
if let AuthorityCheckpointInfo::CheckpointProposal {
proposal,
prev_cert,
} = &response.info
{
// Target validator may be byzantine or behind, ignore it.
continue;
}

let other_proposal = CheckpointProposal::new_from_signed_proposal_summary(
proposal.as_ref().unwrap().clone(),
response.detail.unwrap(),
);

let fragment = my_proposal.fragment_with(&other_proposal);

// We need to augment the fragment with the missing transactions
match augment_fragment_with_diff_transactions(active_authority, fragment).await {
Ok(fragment) => {
// On success send the fragment to consensus
if let Err(err) = checkpoint_db.lock().submit_local_fragment_to_consensus(
&fragment,
&active_authority.state.committee.load(),
) {
warn!("Error submitting local fragment to consensus: {err:?}");
// Check if there is a latest checkpoint
if let Some(prev) = prev_cert {
if prev.summary.sequence_number >= next_checkpoint_sequence_number {
// We are now behind, stop the process
debug!(
latest_cp_cert_seq=?prev.summary.sequence_number,
expected_cp_seq=?next_checkpoint_sequence_number,
"We are behind, abort checkpoint construction process",
);
break;
}
}
Err(err) => {
warn!("Error augmenting the fragment: {err:?}");
}
}

fragments_num += 1;

let result = checkpoint_db.lock().attempt_to_construct_checkpoint(
active_authority.state.database.clone(),
committee,
);
// For some reason the proposal is empty?
if proposal.is_none() || response.detail.is_none() {
continue;
}

match result {
Err(err) => {
// We likely don't have enough fragments, keep trying.
debug!(
?next_checkpoint_sequence_number,
?fragments_num,
"Failed to construct checkpoint: {:?}",
err
);
// TODO: here we should really wait until the fragment is sequenced, otherwise
// we would be going ahead and sequencing more fragments that may not be
// needed. For the moment we just linearly back-off.
tokio::time::sleep(fragments_num * consensus_delay_estimate).await;
// Check the proposal is also for the same checkpoint sequence number
if &proposal.as_ref().unwrap().summary.sequence_number
!= my_proposal.sequence_number()
{
// Target validator may be byzantine or behind, ignore it.
continue;
}
Ok(()) => {
// A new checkpoint has been made.
metrics
.checkpoint_num_fragments_sent
.observe(fragments_num as f64);
return true;

let other_proposal = CheckpointProposal::new_from_signed_proposal_summary(
proposal.as_ref().unwrap().clone(),
response.detail.unwrap(),
);

let fragment = my_proposal.fragment_with(&other_proposal);

// We need to augment the fragment with the missing transactions
match augment_fragment_with_diff_transactions(active_authority, fragment).await
{
Ok(fragment) => {
// On success send the fragment to consensus
if let Err(err) =
checkpoint_db.lock().submit_local_fragment_to_consensus(
&fragment,
&active_authority.state.committee.load(),
)
{
warn!("Error submitting local fragment to consensus: {err:?}");
}
// TODO: here we should really wait until the fragment is sequenced, otherwise
// we would be going ahead and sequencing more fragments that may not be
// needed. For the moment we just rely on linearly back-off.
// https://github.com/MystenLabs/sui/issues/3619.
tokio::time::sleep(consensus_delay_estimate.mul_f64(index as f64))
.await;
}
Err(err) => {
warn!("Error augmenting the fragment: {err:?}");
}
}
}
}
Err(err) => {
warn!(
"Error querying checkpoint proposal from validator {}: {:?}",
authority, err
);
}
}
}
false
Expand Down
22 changes: 12 additions & 10 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) mod checkpoint_tests;
use narwhal_executor::ExecutionIndices;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::{collections::HashSet, path::Path, sync::Arc};
use sui_storage::default_db_options;
use sui_types::messages_checkpoint::CheckpointProposal;
Expand Down Expand Up @@ -508,15 +509,12 @@ impl CheckpointStore {
});
}

let locals = self.get_locals();
if !locals.no_more_fragments {
// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
debug!("Send fragment: {} -- {}", self.name, other_name);
sender.send_to_consensus(fragment.clone())?;
} else {
return Err(SuiError::from("No consensus sender configured"));
}
// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
debug!("Send fragment: {} -- {}", self.name, other_name);
sender.send_to_consensus(fragment.clone())?;
} else {
return Err(SuiError::from("No consensus sender configured"));
}

// NOTE: we should charge the node that sends this into consensus
Expand All @@ -540,7 +538,7 @@ impl CheckpointStore {
debug!(
execution_index=?seq,
cp_seq=?fragment_seq,
"Fragment received from consensus. Proposal: {}, other: {}",
"Fragment received from consensus. Proposer: {}, Other: {}",
fragment.proposer.authority(),
fragment.other.authority(),
);
Expand Down Expand Up @@ -821,6 +819,10 @@ impl CheckpointStore {
next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1
}

pub fn validators_already_fragmented_with(&mut self) -> BTreeSet<AuthorityName> {
self.local_fragments.iter().map(|(name, _)| name).collect()
}

// Helper write functions

/// Set the next checkpoint proposal.
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-core/src/checkpoints/reconstruction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::cmp::max;
use std::collections::{BTreeMap, HashMap, VecDeque};
use tracing::debug;

use sui_types::base_types::ExecutionDigests;
use sui_types::committee::StakeUnit;
Expand Down Expand Up @@ -45,6 +47,8 @@ impl FragmentReconstruction {
let mut proposals: HashMap<AuthorityName, CheckpointProposalSummary> = HashMap::new();
let mut extra_transactions = BTreeMap::new();

let total_fragments_count = fragments.len();
let mut max_weight_seen = 0;
for frag in fragments {
// Double check we have only been given waypoints for the correct sequence number
debug_assert!(frag.proposer.summary.sequence_number == seq);
Expand Down Expand Up @@ -74,6 +78,7 @@ impl FragmentReconstruction {

// Merge the link.
let (top, weight) = span.merge(n1, n2);
max_weight_seen = max(max_weight_seen, weight);

// We have found a connected component larger than the 2/3 threshold
if weight >= committee.quorum_threshold() {
Expand Down Expand Up @@ -110,6 +115,12 @@ impl FragmentReconstruction {
});
}
}
debug!(
?max_weight_seen,
?total_fragments_count,
num_fragments_used = fragments_used.len(),
"Unable to construct a checkpoint after using all fragments",
);

// If we run out of candidates with no checkpoint, there is no
// checkpoint yet.
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-types/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,16 @@ impl Hash for AuthoritySignInfo {
}
}

impl Display for AuthoritySignInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"AuthoritySignInfo {{ epoch: {:?}, authority: {} }}",
self.epoch, self.authority,
)
}
}

impl PartialEq for AuthoritySignInfo {
fn eq(&self, other: &Self) -> bool {
// We do not compare the signature, because there can be multiple
Expand Down
Loading

0 comments on commit 4ff3573

Please sign in to comment.