Skip to content

Commit

Permalink
Split authenticated checkpoint and proposal request (MystenLabs#3536)
Browse files Browse the repository at this point in the history
* Remove checkpoint response success case

* Add separate checkpoint and proposal request types

* Remove Past checkpoint request type

* Replace proposal request

* Fix test failure
  • Loading branch information
lxfind authored Jul 27, 2022
1 parent 936c5e3 commit abe57fc
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 297 deletions.
8 changes: 4 additions & 4 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,11 @@ impl AuthorityState {
})?
.lock();
match &request.request_type {
CheckpointRequestType::LatestCheckpointProposal => {
checkpoint_store.handle_latest_proposal(request)
CheckpointRequestType::AuthenticatedCheckpoint(seq) => {
checkpoint_store.handle_authenticated_checkpoint(seq, request.detail)
}
CheckpointRequestType::PastCheckpoint(seq) => {
checkpoint_store.handle_past_checkpoint(request.detail, *seq)
CheckpointRequestType::CheckpointProposal => {
checkpoint_store.handle_proposal(request.detail)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ use gossip::{gossip_process, node_sync_process};
pub mod checkpoint_driver;
use crate::authority_active::checkpoint_driver::CheckpointMetrics;
use crate::epoch::reconfiguration::Reconfigurable;
use checkpoint_driver::{
checkpoint_process, get_latest_proposal_and_checkpoint_from_all, sync_to_checkpoint,
};
use checkpoint_driver::{checkpoint_process, get_latest_checkpoint_from_all, sync_to_checkpoint};

pub mod execution_driver;

Expand Down Expand Up @@ -261,7 +259,7 @@ where
// TODO: fullnode should not get proposals
// TODO: potentially move get_latest_proposal_and_checkpoint_from_all and
// sync_to_checkpoint out of checkpoint_driver
let (checkpoint_summary, _) = get_latest_proposal_and_checkpoint_from_all(
let checkpoint_summary = get_latest_checkpoint_from_all(
self.net(),
checkpoint_process_control.extra_time_after_quorum,
checkpoint_process_control.timeout_until_quorum,
Expand Down
155 changes: 59 additions & 96 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use sui_types::{
messages_checkpoint::{
AuthenticatedCheckpoint, AuthorityCheckpointInfo, CertifiedCheckpointSummary,
CheckpointContents, CheckpointDigest, CheckpointFragment, CheckpointProposal,
CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
SignedCheckpointProposalSummary, SignedCheckpointSummary,
CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber, SignedCheckpointSummary,
},
};
use tokio::time::Instant;
Expand Down Expand Up @@ -170,17 +169,17 @@ pub async fn checkpoint_process<A>(
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// (1) Get the latest summaries and proposals
// (1) Get the latest checkpoint cert from the network.
// TODO: This may not work if we are many epochs behind: we won't be able to download
// from the current network. We will need to consolidate sync implementation.
let state_of_world = get_latest_proposal_and_checkpoint_from_all(
let highest_checkpoint = get_latest_checkpoint_from_all(
net.clone(),
timing.extra_time_after_quorum,
timing.timeout_until_quorum,
)
.await;

let (checkpoint, proposals) = match state_of_world {
let highest_checkpoint = match highest_checkpoint {
Ok(s) => s,
Err(err) => {
warn!("Cannot get a quorum of checkpoint information: {:?}", err);
Expand All @@ -195,7 +194,7 @@ pub async fn checkpoint_process<A>(
// Its ok nothing else goes on in terms of the active checkpoint logic
// while we do sync. We are in any case not in a position to make valuable
// proposals.
if let Some(checkpoint) = checkpoint {
if let Some(checkpoint) = highest_checkpoint {
// Check if there are more historic checkpoints to catch up with
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
// First sync until before the latest checkpoint. We will special
Expand Down Expand Up @@ -285,35 +284,14 @@ pub async fn checkpoint_process<A>(

let proposal = state_checkpoints.lock().set_proposal(committee.epoch);

// (4) Check if we need to advance to the next checkpoint, in case >2/3
// have a proposal out. If so we start creating and injecting fragments
// into the consensus protocol to make the new checkpoint.
let weight: StakeUnit = proposals
.iter()
.map(|(auth, _)| committee.weight(auth))
.sum();

if weight < committee.quorum_threshold() {
debug!(
?weight,
"Not enough proposals to make progress on checkpoint creation"
);
// We don't have a quorum of proposals yet, we won't succeed making a checkpoint
// even if we try. Sleep and come back latter.
tokio::time::sleep(timing.delay_on_local_failure).await;
continue;
}

// (5) Now we try to create fragments and construct checkpoint.
// TODO: Restructure the fragment making process.
match proposal {
Ok(my_proposal) => {
if create_fragments_and_make_checkpoint(
active_authority,
state_checkpoints.clone(),
&my_proposal,
// We use the list of validators that responded with a proposal
// to download proposal details.
proposals.into_iter().map(|(name, _)| name).collect(),
committee,
timing.consensus_delay_estimate,
&metrics,
Expand All @@ -322,6 +300,9 @@ pub async fn checkpoint_process<A>(
{
info!(cp_seq=?my_proposal.sequence_number(), "A new checkpoint is created and signed locally");
metrics.checkpoints_signed.inc();
} else {
debug!("Failed to make checkpoint after going through all available proposals");
tokio::time::sleep(timing.delay_on_local_failure).await;
}
}
Err(err) => {
Expand All @@ -336,31 +317,22 @@ pub async fn checkpoint_process<A>(
}
}

/// Reads the latest checkpoint / proposal info from all validators
/// and extracts the latest checkpoint as well as the set of proposals
pub async fn get_latest_proposal_and_checkpoint_from_all<A>(
/// Obtain the highest checkpoint certificate from all validators.
/// It's done by querying the latest authenticated checkpoints from a quorum of validators.
/// If we get a quorum of signed checkpoints of the same sequence number, form a cert on the fly.
pub async fn get_latest_checkpoint_from_all<A>(
net: Arc<AuthorityAggregator<A>>,
timeout_after_quorum: Duration,
timeout_until_quorum: Duration,
) -> Result<
(
Option<CertifiedCheckpointSummary>,
Vec<(AuthorityName, SignedCheckpointProposalSummary)>,
),
SuiError,
>
) -> Result<Option<CertifiedCheckpointSummary>, SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
#[derive(Default)]
struct CheckpointSummaries {
good_weight: StakeUnit,
bad_weight: StakeUnit,
responses: Vec<(
AuthorityName,
Option<SignedCheckpointProposalSummary>,
Option<AuthenticatedCheckpoint>,
)>,
responses: Vec<(AuthorityName, Option<AuthenticatedCheckpoint>)>,
errors: Vec<(AuthorityName, SuiError)>,
}
let initial_state = CheckpointSummaries::default();
Expand All @@ -373,18 +345,18 @@ where
Box::pin(async move {
// Request and return an error if any
client
.handle_checkpoint(CheckpointRequest::latest(false))
.handle_checkpoint(CheckpointRequest::authenticated(None, false))
.await
})
},
|mut state, name, weight, result| {
Box::pin(async move {
if let Ok(CheckpointResponse {
info: AuthorityCheckpointInfo::Proposal { current, previous },
info: AuthorityCheckpointInfo::AuthenticatedCheckpoint(checkpoint),
..
}) = result
{
state.responses.push((name, current, previous));
state.responses.push((name, checkpoint));
state.good_weight += weight;
} else {
state.bad_weight += weight;
Expand Down Expand Up @@ -428,7 +400,7 @@ where
// Extract the highest checkpoint cert returned.
let mut highest_certificate_cert: Option<CertifiedCheckpointSummary> = None;
for state in &final_state.responses {
if let Some(AuthenticatedCheckpoint::Certified(cert)) = &state.2 {
if let Some(AuthenticatedCheckpoint::Certified(cert)) = &state.1 {
if let Some(old_cert) = &highest_certificate_cert {
if cert.summary.sequence_number > old_cert.summary.sequence_number {
highest_certificate_cert = Some(cert.clone());
Expand All @@ -445,32 +417,29 @@ where
(CheckpointSequenceNumber, CheckpointDigest),
Vec<(AuthorityName, SignedCheckpointSummary)>,
> = BTreeMap::new();
final_state
.responses
.iter()
.for_each(|(auth, _proposal, checkpoint)| {
if let Some(AuthenticatedCheckpoint::Signed(signed)) = checkpoint {
// We are interested in this signed checkpoint only if it is
// newer than the highest known cert checkpoint.
if let Some(newest_checkpoint) = &highest_certificate_cert {
if newest_checkpoint.summary.sequence_number >= signed.summary.sequence_number {
return;
}
final_state.responses.iter().for_each(|(auth, checkpoint)| {
if let Some(AuthenticatedCheckpoint::Signed(signed)) = checkpoint {
// We are interested in this signed checkpoint only if it is
// newer than the highest known cert checkpoint.
if let Some(newest_checkpoint) = &highest_certificate_cert {
if newest_checkpoint.summary.sequence_number >= signed.summary.sequence_number {
return;
}

// Collect signed checkpoints by sequence number and digest.
partial_checkpoints
.entry((signed.summary.sequence_number, signed.summary.digest()))
.or_insert_with(Vec::new)
.push((*auth, signed.clone()));
}
});

// Collect signed checkpoints by sequence number and digest.
partial_checkpoints
.entry((signed.summary.sequence_number, signed.summary.digest()))
.or_insert_with(Vec::new)
.push((*auth, signed.clone()));
}
});

// We use a BTreeMap here to ensure we iterate in increasing order of checkpoint
// sequence numbers. If we find a valid checkpoint we are sure this is the highest.
partial_checkpoints
.iter()
.for_each(|((_seq, _digest), signed)| {
.for_each(|((seq, _digest), signed)| {
let weight: StakeUnit = signed
.iter()
.map(|(auth, _)| net.committee.weight(auth))
Expand All @@ -487,31 +456,17 @@ where
&net.committee,
);
if let Ok(cert) = certificate {
debug!(cp_seq=?seq, "A checkpoint certificate is formed from the network");
highest_certificate_cert = Some(cert);
}
}
});

let next_proposal_sequence_number = highest_certificate_cert
.as_ref()
.map(|cert| cert.summary.sequence_number + 1)
.unwrap_or(0);

// Collect proposals
let proposals: Vec<_> = final_state
.responses
.iter()
.filter_map(|(auth, proposal, _checkpoint)| {
if let Some(p) = proposal {
if p.summary.sequence_number == next_proposal_sequence_number {
return Some((*auth, p.clone()));
}
}
None
})
.collect();

Ok((highest_certificate_cert, proposals))
debug!(
"Highest Checkpoint Certificate from the network: {:?}",
highest_certificate_cert
);
Ok(highest_certificate_cert)
}

/// The latest certified checkpoint can either be a checkpoint downloaded from another validator,
Expand Down Expand Up @@ -708,7 +663,8 @@ where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
net.get_certified_checkpoint(
&CheckpointRequest::past(sequence_number, contents),
sequence_number,
contents,
available_authorities,
// Loop forever until we get the cert from someone.
None,
Expand All @@ -724,7 +680,6 @@ pub async fn create_fragments_and_make_checkpoint<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: Arc<Mutex<CheckpointStore>>,
my_proposal: &CheckpointProposal,
mut available_authorities: BTreeSet<AuthorityName>,
committee: &Committee,
consensus_delay_estimate: Duration,
metrics: &CheckpointMetrics,
Expand All @@ -734,7 +689,11 @@ where
{
// Pick another authority, get their proposal, and submit it to consensus
// Exit when we have a checkpoint proposal.

let mut available_authorities: BTreeSet<AuthorityName> = committee
.voting_rights
.iter()
.map(|(name, _)| *name)
.collect();
available_authorities.remove(&active_authority.state.name); // remove ourselves

let next_checkpoint_sequence_number = checkpoint_db.lock().next_checkpoint();
Expand All @@ -754,33 +713,37 @@ where
let client = active_authority.net.load().authority_clients[authority].clone();

if let Ok(response) = client
.handle_checkpoint(CheckpointRequest::latest(true))
.handle_checkpoint(CheckpointRequest::proposal(true))
.await
{
if let AuthorityCheckpointInfo::Proposal { current, previous } = &response.info {
if let AuthorityCheckpointInfo::CheckpointProposal {
proposal,
prev_cert,
} = &response.info
{
// Check if there is a latest checkpoint
if let Some(AuthenticatedCheckpoint::Certified(prev)) = previous {
if let Some(prev) = prev_cert {
if prev.summary.sequence_number >= next_checkpoint_sequence_number {
// We are now behind, stop the process
break;
}
}

// For some reason the proposal is empty?
if current.is_none() || response.detail.is_none() {
if proposal.is_none() || response.detail.is_none() {
continue;
}

// Check the proposal is also for the same checkpoint sequence number
if &current.as_ref().unwrap().summary.sequence_number
if &proposal.as_ref().unwrap().summary.sequence_number
!= my_proposal.sequence_number()
{
// Target validator could be byzantine, just ignore it.
// Target validator may be byzantine or behind, ignore it.
continue;
}

let other_proposal = CheckpointProposal::new_from_signed_proposal_summary(
current.as_ref().unwrap().clone(),
proposal.as_ref().unwrap().clone(),
response.detail.unwrap(),
);

Expand Down
14 changes: 9 additions & 5 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use sui_types::committee::StakeUnit;
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, timeout};

use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::TapFallible;

const OBJECT_DOWNLOAD_CHANNEL_BOUND: usize = 1024;
Expand Down Expand Up @@ -1726,23 +1727,26 @@ where

pub async fn get_certified_checkpoint(
&self,
request: &CheckpointRequest,
sequence_number: CheckpointSequenceNumber,
request_contents: bool,
// authorities known to have the checkpoint we are requesting.
authorities: &BTreeSet<AuthorityName>,
timeout_total: Option<Duration>,
) -> SuiResult<(CertifiedCheckpointSummary, Option<CheckpointContents>)> {
let request = CheckpointRequest::authenticated(Some(sequence_number), request_contents);
self.quorum_once_with_timeout(
None,
Some(authorities),
|_, client| {
let r = request.clone();
Box::pin(async move {
let resp = client.handle_checkpoint(request.clone()).await?;
let resp = client.handle_checkpoint(r).await?;

if let CheckpointResponse {
info:
AuthorityCheckpointInfo::Past(Some(AuthenticatedCheckpoint::Certified(
past,
))),
AuthorityCheckpointInfo::AuthenticatedCheckpoint(Some(
AuthenticatedCheckpoint::Certified(past),
)),
detail,
} = resp
{
Expand Down
Loading

0 comments on commit abe57fc

Please sign in to comment.