Skip to content

Commit

Permalink
[core] Fix committee change test | unconditionally accumulate epoch (M…
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith authored Mar 2, 2023
1 parent 3bdcf31 commit 4e63679
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 62 deletions.
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2680,7 +2680,8 @@ impl AuthorityState {
} else {
info!(
"validator {:?} does not support {:?}",
authority, next_protocol_version
authority.concise(),
next_protocol_version
);
}
}
Expand Down
88 changes: 50 additions & 38 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use futures::stream::FuturesOrdered;
use itertools::izip;
use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{spawn_monitored_task, MonitoredFutureExt};
use prometheus::Registry;
use sui_config::node::CheckpointExecutorConfig;
use sui_types::error::SuiResult;
Expand All @@ -36,10 +36,7 @@ use sui_types::{
messages::TransactionEffects,
messages_checkpoint::{CheckpointSequenceNumber, EndOfEpochData, VerifiedCheckpoint},
};
use sui_types::{
committee::{Committee, EpochId},
message_envelope::Message,
};
use sui_types::{committee::Committee, message_envelope::Message};
use tap::TapFallible;
use tokio::{
sync::broadcast::{self, error::RecvError},
Expand Down Expand Up @@ -151,8 +148,9 @@ impl CheckpointExecutor {

loop {
// If we have executed the last checkpoint of the current epoch, stop.
if let Some(next_epoch_committee) =
check_epoch_last_checkpoint(epoch_store.epoch(), &highest_executed)
if let Some(next_epoch_committee) = self
.check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
.await
{
// be extra careful to ensure we don't have orphans
assert!(
Expand Down Expand Up @@ -333,41 +331,55 @@ impl CheckpointExecutor {
checkpoint
}));
}
}

/// Check whether `checkpoint` is the last checkpoint of the current epoch. If so, return the
/// committee of the next epoch.
fn check_epoch_last_checkpoint(
cur_epoch: EpochId,
checkpoint: &Option<VerifiedCheckpoint>,
) -> Option<Committee> {
if let Some(checkpoint) = checkpoint {
if checkpoint.epoch() == cur_epoch {
if let Some(EndOfEpochData {
next_epoch_committee,
next_epoch_protocol_version,
..
}) = &checkpoint.summary.end_of_epoch_data
{
info!(
ended_epoch = cur_epoch,
?next_epoch_protocol_version,
last_checkpoint = checkpoint.sequence_number(),
"Reached end of epoch",
);
let next_epoch = cur_epoch + 1;
return Some(
Committee::new(
next_epoch,
*next_epoch_protocol_version,
next_epoch_committee.iter().cloned().collect(),
)
.expect("Creating new committee object cannot fail"),
);
/// Check whether `checkpoint` is the last checkpoint of the current epoch. If so, return the
/// committee of the next epoch.
pub async fn check_epoch_last_checkpoint(
&self,
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint: &Option<VerifiedCheckpoint>,
) -> Option<Committee> {
let cur_epoch = epoch_store.epoch();

if let Some(checkpoint) = checkpoint {
if checkpoint.epoch() == cur_epoch {
if let Some(EndOfEpochData {
next_epoch_committee,
next_epoch_protocol_version,
..
}) = &checkpoint.summary.end_of_epoch_data
{
info!(
ended_epoch = cur_epoch,
?next_epoch_protocol_version,
last_checkpoint = checkpoint.sequence_number(),
"Reached end of epoch",
);

self.accumulator
.accumulate_epoch(
&cur_epoch,
checkpoint.sequence_number(),
epoch_store.clone(),
)
.in_monitored_scope("CheckpointExecutor::accumulate_epoch")
.await
.expect("Accumulating epoch cannot fail");

let next_epoch = cur_epoch + 1;
return Some(
Committee::new(
next_epoch,
*next_epoch_protocol_version,
next_epoch_committee.iter().cloned().collect(),
)
.expect("Creating new committee object cannot fail"),
);
}
}
}
None
}
None
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
Expand Down
26 changes: 4 additions & 22 deletions crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn test_checkpoint_executor_cross_epoch() {
let tempdir = tempdir().unwrap();
let checkpoint_store = CheckpointStore::new(tempdir.path());

let (authority_state, mut executor, accumulator, checkpoint_sender, first_committee): (
let (authority_state, mut executor, _accumulator, checkpoint_sender, first_committee): (
Arc<AuthorityState>,
CheckpointExecutor,
Arc<StateAccumulator>,
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn test_checkpoint_executor_cross_epoch() {
.unwrap();
// sync end of epoch checkpoint
let last_executed_checkpoint = next_epoch_checkpoints.last().cloned().unwrap();
let (end_of_epoch_1_checkpoint, _third_committee) = sync_end_of_epoch_checkpoint(
let (_end_of_epoch_1_checkpoint, _third_committee) = sync_end_of_epoch_checkpoint(
&checkpoint_store,
&checkpoint_sender,
last_executed_checkpoint.clone(),
Expand All @@ -196,17 +196,6 @@ pub async fn test_checkpoint_executor_cross_epoch() {
.await
.unwrap();

let first_epoch = 0;

accumulator
.digest_epoch(
&first_epoch,
end_of_epoch_0_checkpoint.sequence_number(),
epoch_store.clone(),
)
.await
.unwrap();

// We should have synced up to epoch boundary
assert_eq!(
checkpoint_store
Expand All @@ -216,6 +205,8 @@ pub async fn test_checkpoint_executor_cross_epoch() {
num_to_sync_per_epoch as u64,
);

let first_epoch = 0;

// Ensure root state hash for epoch exists at end of epoch
assert!(authority_state
.database
Expand Down Expand Up @@ -261,15 +252,6 @@ pub async fn test_checkpoint_executor_cross_epoch() {
let second_epoch = 1;
assert!(second_epoch == new_epoch_store.epoch());

accumulator
.digest_epoch(
&second_epoch,
end_of_epoch_1_checkpoint.sequence_number(),
new_epoch_store.clone(),
)
.await
.unwrap();

assert!(authority_state
.database
.perpetual_tables
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl StateAccumulator {
.root_state_notify_read
.notify(epoch, &(last_checkpoint_of_epoch, root_state_hash.clone()));

debug!("Accumulated epoch {}", epoch);
Ok(root_state_hash)
}

Expand Down
1 change: 0 additions & 1 deletion crates/sui/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ async fn test_validator_resign_effects() {

// TODO: This test is currently flaky. Need to re-enable it once we fix the issue.
#[sim_test]
#[ignore]
async fn test_reconfig_with_committee_change_basic() {
// This test exercise the full flow of a validator joining the network, catch up and then leave.

Expand Down

0 comments on commit 4e63679

Please sign in to comment.