Skip to content

Commit

Permalink
Refactor authenticated checkpoint data structures (MystenLabs#2635)
Browse files Browse the repository at this point in the history
* Refactor authenticated checkpoint data structures

* Remove SignedCheckpointProposal

* Rename fields
  • Loading branch information
lxfind authored Jun 24, 2022
1 parent c935251 commit 88abe65
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 214 deletions.
67 changes: 31 additions & 36 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use sui_types::{
error::SuiError,
messages::{CertifiedTransaction, ConfirmationTransaction, TransactionInfoRequest},
messages_checkpoint::{
AuthenticatedCheckpoint, AuthorityCheckpointInfo, CertifiedCheckpoint, CheckpointContents,
CheckpointDigest, CheckpointFragment, CheckpointRequest, CheckpointResponse,
CheckpointSequenceNumber, SignedCheckpoint, SignedCheckpointProposal,
AuthenticatedCheckpoint, AuthorityCheckpointInfo, CertifiedCheckpointSummary,
CheckpointContents, CheckpointDigest, CheckpointFragment, CheckpointRequest,
CheckpointResponse, CheckpointSequenceNumber, SignedCheckpointSummary,
},
};
use tokio::time::timeout;
Expand Down Expand Up @@ -126,7 +126,7 @@ pub async fn checkpoint_process<A>(
if let Some(checkpoint) = checkpoint {
// Check if there are more historic checkpoints to catch up with
let next_checkpoint = state_checkpoints.lock().next_checkpoint();
if next_checkpoint < checkpoint.checkpoint.sequence_number {
if next_checkpoint < checkpoint.summary.sequence_number {
// TODO log error
if let Err(err) = sync_to_checkpoint(
active_authority.state.name,
Expand Down Expand Up @@ -234,8 +234,8 @@ pub async fn get_latest_proposal_and_checkpoint_from_all<A>(
timeout_until_quorum: Duration,
) -> Result<
(
Option<CertifiedCheckpoint>,
Vec<(AuthorityName, SignedCheckpointProposal)>,
Option<CertifiedCheckpointSummary>,
Vec<(AuthorityName, SignedCheckpointSummary)>,
),
SuiError,
>
Expand All @@ -248,7 +248,7 @@ where
bad_weight: StakeUnit,
responses: Vec<(
AuthorityName,
Option<SignedCheckpointProposal>,
Option<SignedCheckpointSummary>,
AuthenticatedCheckpoint,
)>,
errors: Vec<(AuthorityName, SuiError)>,
Expand Down Expand Up @@ -316,11 +316,11 @@ where
.await?;

// Extract the highest checkpoint cert returned.
let mut highest_certificate_cert: Option<CertifiedCheckpoint> = None;
let mut highest_certificate_cert: Option<CertifiedCheckpointSummary> = None;
for state in &final_state.responses {
if let AuthenticatedCheckpoint::Certified(cert) = &state.2 {
if let Some(old_cert) = &highest_certificate_cert {
if cert.checkpoint.sequence_number > old_cert.checkpoint.sequence_number {
if cert.summary.sequence_number > old_cert.summary.sequence_number {
highest_certificate_cert = Some(cert.clone());
}
} else {
Expand All @@ -333,7 +333,7 @@ where
#[allow(clippy::type_complexity)]
let mut partial_checkpoints: BTreeMap<
(CheckpointSequenceNumber, CheckpointDigest),
Vec<(AuthorityName, SignedCheckpoint)>,
Vec<(AuthorityName, SignedCheckpointSummary)>,
> = BTreeMap::new();
final_state
.responses
Expand All @@ -342,19 +342,14 @@ where
if let AuthenticatedCheckpoint::Signed(signed) = checkpoint {
// We check this signature is higher than the highest known checkpoint.
if let Some(newest_checkpoint) = &highest_certificate_cert {
if newest_checkpoint.checkpoint.sequence_number
> signed.checkpoint.sequence_number
{
if newest_checkpoint.summary.sequence_number > signed.summary.sequence_number {
return;
}
}

// Collect signed checkpoints by sequence number and digest.
partial_checkpoints
.entry((
signed.checkpoint.sequence_number,
signed.checkpoint.digest(),
))
.entry((signed.summary.sequence_number, signed.summary.digest()))
.or_insert_with(Vec::new)
.push((*auth, signed.clone()));
}
Expand All @@ -376,7 +371,7 @@ where
// the checkpoint for others to download.
if weight >= net.committee.validity_threshold() {
// Try to construct a valid checkpoint.
let certificate = CertifiedCheckpoint::aggregate(
let certificate = CertifiedCheckpointSummary::aggregate(
signed.iter().map(|(_, signed)| signed.clone()).collect(),
&net.committee,
);
Expand All @@ -390,7 +385,7 @@ where
// >2/3 of validators proposing a new checkpoint.
let next_proposal_sequence_number = highest_certificate_cert
.as_ref()
.map(|cert| cert.checkpoint.sequence_number + 1)
.map(|cert| cert.summary.sequence_number + 1)
.unwrap_or(0);

// Collect proposals
Expand All @@ -399,7 +394,7 @@ where
.iter()
.filter_map(|(auth, proposal, _checkpoint)| {
if let Some(p) = proposal {
if p.0.checkpoint.sequence_number == next_proposal_sequence_number {
if p.summary.sequence_number == next_proposal_sequence_number {
return Some((*auth, p.clone()));
}
}
Expand All @@ -415,7 +410,7 @@ pub async fn sync_to_checkpoint<A>(
name: AuthorityName,
net: Arc<AuthorityAggregator<A>>,
checkpoint_db: Arc<Mutex<CheckpointStore>>,
latest_known_checkpoint: CertifiedCheckpoint,
latest_known_checkpoint: CertifiedCheckpointSummary,
) -> Result<(), SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
Expand All @@ -430,7 +425,7 @@ where
// Check if the latest checkpoint is merely a signed checkpoint, and if
// so download a full certificate for it.
if let Some(AuthenticatedCheckpoint::Signed(signed)) = &latest_checkpoint {
let seq = *signed.checkpoint.sequence_number();
let seq = *signed.summary.sequence_number();
debug!("Partial Sync ({name:?}): {seq:?}",);
let (past, _contents) =
get_one_checkpoint(net.clone(), seq, false, &available_authorities).await?;
Expand All @@ -446,13 +441,13 @@ where

let full_sync_start = latest_checkpoint
.map(|chk| match chk {
AuthenticatedCheckpoint::Signed(signed) => signed.checkpoint.sequence_number + 1,
AuthenticatedCheckpoint::Certified(cert) => cert.checkpoint.sequence_number + 1,
AuthenticatedCheckpoint::Signed(signed) => signed.summary.sequence_number + 1,
AuthenticatedCheckpoint::Certified(cert) => cert.summary.sequence_number + 1,
AuthenticatedCheckpoint::None => unreachable!(),
})
.unwrap_or(0);

for seq in full_sync_start..latest_known_checkpoint.checkpoint.sequence_number {
for seq in full_sync_start..latest_known_checkpoint.summary.sequence_number {
debug!("Full Sync ({name:?}): {seq:?}");
let (past, _contents) =
get_one_checkpoint(net.clone(), seq, true, &available_authorities).await?;
Expand All @@ -478,7 +473,7 @@ pub async fn get_one_checkpoint<A>(
sequence_number: CheckpointSequenceNumber,
contents: bool,
available_authorities: &BTreeSet<AuthorityName>,
) -> Result<(CertifiedCheckpoint, Option<CheckpointContents>), SuiError>
) -> Result<(CertifiedCheckpointSummary, Option<CheckpointContents>), SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
Expand Down Expand Up @@ -525,7 +520,7 @@ where
pub async fn get_checkpoint_contents<A>(
name: AuthorityName,
net: Arc<AuthorityAggregator<A>>,
checkpoint: &CertifiedCheckpoint,
checkpoint: &CertifiedCheckpointSummary,
) -> Result<CheckpointContents, SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
Expand All @@ -547,7 +542,7 @@ where
let client = net.clone_client(sample_authority);
match client
.handle_checkpoint(CheckpointRequest::past(
checkpoint.checkpoint.sequence_number,
checkpoint.summary.sequence_number,
true,
))
.await
Expand All @@ -557,7 +552,7 @@ where
detail: Some(contents),
}) => {
// Check here that the digest of contents matches
if contents.digest() != checkpoint.checkpoint.content_digest {
if contents.digest() != checkpoint.summary.content_digest {
// A byzantine authority!
// TODO: Report Byzantine authority
warn!("Sync Error: Incorrect contents returned");
Expand All @@ -583,7 +578,7 @@ pub async fn diff_proposals<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: Arc<Mutex<CheckpointStore>>,
my_proposal: &CheckpointProposal,
proposals: Vec<(AuthorityName, SignedCheckpointProposal)>,
proposals: Vec<(AuthorityName, SignedCheckpointSummary)>,
consensus_delay_estimate: Duration,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
Expand All @@ -597,7 +592,7 @@ pub async fn diff_proposals<A>(

loop {
let next_checkpoint_sequence_number: u64 = checkpoint_db.lock().next_checkpoint();
if next_checkpoint_sequence_number > *my_proposal.proposal.0.checkpoint.sequence_number() {
if next_checkpoint_sequence_number > *my_proposal.signed_summary.summary.sequence_number() {
// Our work here is done, we have progressed past the checkpoint for which we were given a proposal.
// Our DB has been updated (presumably by consensus) with the sought information (a checkpoint
// for this sequence number)
Expand All @@ -622,7 +617,7 @@ pub async fn diff_proposals<A>(
if let AuthorityCheckpointInfo::Proposal { current, previous } = &response.info {
// Check if there is a latest checkpoint
if let AuthenticatedCheckpoint::Certified(prev) = previous {
if prev.checkpoint.sequence_number > next_checkpoint_sequence_number {
if prev.summary.sequence_number > next_checkpoint_sequence_number {
// We are now way behind, return
return;
}
Expand All @@ -634,7 +629,7 @@ pub async fn diff_proposals<A>(
}

// Check the proposal is also for the same checkpoint sequence number
if current.as_ref().unwrap().0.checkpoint.sequence_number()
if current.as_ref().unwrap().summary.sequence_number()
!= my_proposal.sequence_number()
{
return;
Expand All @@ -652,8 +647,8 @@ pub async fn diff_proposals<A>(
{
Ok(fragment) => {
// On success send the fragment to consensus
let proposer = &fragment.proposer.0.authority;
let other = &fragment.other.0.authority;
let proposer = fragment.proposer.authority();
let other = fragment.other.authority();
debug!("Send fragment: {proposer:?} -- {other:?}");
let _ = checkpoint_db.lock().handle_receive_fragment(
&fragment,
Expand Down Expand Up @@ -712,7 +707,7 @@ where
let client = active_authority
.net
.load()
.clone_client(&fragment.other.0.authority);
.clone_client(fragment.other.authority());
for tx_digest in &fragment.diff.first.items {
let response = client
.handle_transaction_info_request(TransactionInfoRequest::from(tx_digest.transaction))
Expand Down
Loading

0 comments on commit 88abe65

Please sign in to comment.