Skip to content

Commit

Permalink
Collect change epoch system transaction cert and execute (MystenLabs#…
Browse files Browse the repository at this point in the history
…2395)

* Collect change epoch cert

* Address feedback
  • Loading branch information
lxfind authored Jun 6, 2022
1 parent a14d3ae commit 663b7ae
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 69 deletions.
28 changes: 16 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
transaction_input_checker,
};
use anyhow::anyhow;
use arc_swap::{ArcSwap, ArcSwapOption};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use itertools::Itertools;
use move_binary_format::CompiledModule;
Expand Down Expand Up @@ -232,7 +232,6 @@ pub struct AuthorityState {
/// A global lock to halt all transaction/cert processing.
#[allow(dead_code)]
pub(crate) halted: AtomicBool,
pub(crate) change_epoch_tx: ArcSwapOption<SignedTransaction>,

/// Move native functions that are available to invoke
pub(crate) _native_functions: NativeFunctionTable,
Expand Down Expand Up @@ -281,12 +280,6 @@ impl AuthorityState {
&self,
transaction: Transaction,
) -> Result<TransactionInfoResponse, SuiError> {
// Validators should never sign an external system transaction.
fp_ensure!(
!transaction.data.kind.is_system_tx(),
SuiError::InvalidSystemTransaction
);

let transaction_digest = *transaction.digest();
// Ensure an idempotent answer.
if self.database.transaction_exists(&transaction_digest)? {
Expand All @@ -295,6 +288,12 @@ impl AuthorityState {
return Ok(transaction_info);
}

// Validators should never sign an external system transaction.
fp_ensure!(
!transaction.data.kind.is_system_tx(),
SuiError::InvalidSystemTransaction
);

if self.halted.load(Ordering::SeqCst) {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
Expand Down Expand Up @@ -371,7 +370,13 @@ impl AuthorityState {
return Ok(info);
}

if self.halted.load(Ordering::SeqCst) {
if self.halted.load(Ordering::SeqCst)
&& !confirmation_transaction
.certificate
.data
.kind
.is_system_tx()
{
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}
Expand Down Expand Up @@ -825,7 +830,6 @@ impl AuthorityState {
secret,
committee: ArcSwap::from(Arc::new(current_epoch_info.committee)),
halted: AtomicBool::new(current_epoch_info.validator_halted),
change_epoch_tx: ArcSwapOption::empty(),
_native_functions: native_functions,
move_vm,
database: store.clone(),
Expand Down Expand Up @@ -903,7 +907,7 @@ impl AuthorityState {
Ok(())
}

pub(crate) fn begin_new_epoch(&self) -> SuiResult {
pub(crate) fn unhalt_validator(&self) -> SuiResult {
let epoch_info = self.database.get_last_epoch_info()?;
assert_eq!(
&epoch_info.committee,
Expand Down Expand Up @@ -1145,7 +1149,7 @@ impl AuthorityState {
certificate: &CertifiedTransaction,
signed_effects: &SignedTransactionEffects,
) -> SuiResult {
if self.halted.load(Ordering::SeqCst) {
if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() {
// TODO: Here we should allow consensus transaction to continue.
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
Expand Down
59 changes: 49 additions & 10 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority_active::ActiveAuthority;
use crate::authority_aggregator::AuthorityAggregator;
use crate::authority_client::AuthorityAPI;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use sui_types::committee::Committee;
use sui_types::crypto::PublicKeyBytes;
use sui_types::error::SuiResult;
use sui_types::messages::SignedTransaction;
use sui_types::messages::{ConfirmationTransaction, SignedTransaction};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use typed_store::Map;

// TODO: Make last checkpoint number of each epoch more flexible.
pub const CHECKPOINT_COUNT_PER_EPOCH: u64 = 200;

impl<A> ActiveAuthority<A> {
const WAIT_BETWEEN_EPOCH_TX_QUERY_RETRY: Duration = Duration::from_millis(300);

impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
/// This function should be called by the active checkpoint process, when it finishes processing
/// all transactions from the second to the least checkpoint of the epoch. It's called by a
/// validator that belongs to the committee of the current epoch.
pub async fn start_epoch_change(&self) -> SuiResult {
if let Some(checkpoints) = &self.state.checkpoints {
let mut checkpoints = checkpoints.lock();
Expand All @@ -41,6 +51,9 @@ impl<A> ActiveAuthority<A> {
Ok(())
}

/// This function should be called by the active checkpoint process, when it finishes processing
/// all transactions from the last checkpoint of the epoch. This function needs to be called by
/// a validator that belongs to the committee of the next epoch.
pub async fn finish_epoch_change(&self) -> SuiResult {
assert!(
self.state.halted.load(Ordering::SeqCst),
Expand Down Expand Up @@ -81,7 +94,14 @@ impl<A> ActiveAuthority<A> {
.collect();
let new_committee = Committee::new(next_epoch, votes);
self.state.insert_new_epoch_info(&new_committee)?;
//self.state.checkpoints.as_ref().unwrap().lock().committee = new_committee;
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
self.net.load().clone_inner_clients(),
));
self.net.store(new_net.clone());
// TODO: Also reconnect network if changed.
// This is blocked for now since we are not storing network info on-chain yet.

// TODO: Update all committee in all components safely,
// potentially restart some authority clients.
// Including: self.net, narwhal committee/consensus adapter,
Expand All @@ -95,22 +115,41 @@ impl<A> ActiveAuthority<A> {
self.state.name,
&*self.state.secret,
);
// Add the signed transaction to the store.
self.state
.change_epoch_tx
.store(Some(Arc::new(advance_epoch_tx)));
.set_transaction_lock(&[], advance_epoch_tx.clone())
.await?;

// Collect a certificate for this system transaction that changes epoch,
// and execute it locally.
loop {
if let Ok(certificate) = new_net
.process_transaction(
advance_epoch_tx.clone().to_transaction(),
Duration::from_secs(0),
)
.await
{
self.state
.handle_confirmation_transaction(ConfirmationTransaction { certificate })
.await
.expect("Executing the special cert cannot fail");
break;
}

// TODO: Now ask every validator in the committee for this signed tx.
// Aggregate them to obtain a cert, execute the cert, and then start the new epoch.
tokio::time::sleep(WAIT_BETWEEN_EPOCH_TX_QUERY_RETRY).await;
}

self.state.begin_new_epoch()?;
// Resume the validator to start accepting transactions for the new epoch.
self.state.unhalt_validator()?;
Ok(())
}

fn is_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool {
pub fn is_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool {
checkpoint > 0 && checkpoint % CHECKPOINT_COUNT_PER_EPOCH == 0
}

fn is_second_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool {
pub fn is_second_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool {
(checkpoint + 1) % CHECKPOINT_COUNT_PER_EPOCH == 0
}
}
116 changes: 73 additions & 43 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sui_types::{
gas::SuiGasStatus,
messages::{ConfirmationTransaction, SignatureAggregator, Transaction, TransactionData},
object::Object,
SUI_SYSTEM_STATE_OBJECT_ID,
};

use crate::{
Expand Down Expand Up @@ -57,7 +58,7 @@ async fn test_start_epoch_change() {
})
.unwrap();
// Create an active authority for the first authority state.
let active = ActiveAuthority::new(state.clone(), net.authority_clients).unwrap();
let active = ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap();
// Make the high watermark differ from low watermark.
let ticket = state.batch_notifier.ticket().unwrap();

Expand Down Expand Up @@ -167,50 +168,79 @@ async fn test_finish_epoch_change() {
genesis_objects.clone(),
])
.await;
let state = states[0].clone();
// Set the checkpoint number to be near the end of epoch.
let mut locals = CheckpointLocals {
next_checkpoint: CHECKPOINT_COUNT_PER_EPOCH - 1,
proposal_next_transaction: None,
next_transaction_sequence: 0,
no_more_fragments: true,
current_proposal: None,
};
state
.checkpoints
.as_ref()
.unwrap()
.lock()
.set_locals_for_testing(locals.clone())
.unwrap();
// Create an active authority for the first authority state.
let active = ActiveAuthority::new(state.clone(), net.authority_clients).unwrap();
let actives: Vec<_> = states
.iter()
.map(|state| ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap())
.collect();
let results: Vec<_> = states
.iter()
.zip(actives.iter())
.map(|(state, active)| {
async {
// Set the checkpoint number to be near the end of epoch.
let mut locals = CheckpointLocals {
next_checkpoint: CHECKPOINT_COUNT_PER_EPOCH - 1,
proposal_next_transaction: None,
next_transaction_sequence: 0,
no_more_fragments: true,
current_proposal: None,
};
state
.checkpoints
.as_ref()
.unwrap()
.lock()
.set_locals_for_testing(locals.clone())
.unwrap();

active.start_epoch_change().await.unwrap();
active.start_epoch_change().await.unwrap();

locals.next_checkpoint += 1;
state
.checkpoints
.as_ref()
.unwrap()
.lock()
.set_locals_for_testing(locals.clone())
.unwrap();
active.finish_epoch_change().await.unwrap();
// Verify that epoch changed in authority state.
assert_eq!(active.state.committee.load().epoch, 1);
assert_eq!(
active
locals.next_checkpoint += 1;
state
.checkpoints
.as_ref()
.unwrap()
.lock()
.set_locals_for_testing(locals.clone())
.unwrap();

active.finish_epoch_change().await.unwrap()
}
})
.collect();
futures::future::join_all(results).await;

// Verify that epoch changed in every authority state.
for active in actives {
assert_eq!(active.state.committee.load().epoch, 1);
assert_eq!(active.net.load().committee.epoch, 1);
assert_eq!(
active
.state
.db()
.get_last_epoch_info()
.unwrap()
.committee
.epoch,
1
);
// Verify that validator is no longer halted.
assert!(!active.state.halted.load(Ordering::SeqCst));
let system_state = active.state.get_sui_system_state_object().await.unwrap();
assert_eq!(system_state.epoch, 1);
let (_, tx_digest) = active
.state
.db()
.get_last_epoch_info()
.get_latest_parent_entry(SUI_SYSTEM_STATE_OBJECT_ID)
.await
.unwrap()
.committee
.epoch,
1
);
// Verify that validator is no longer halted.
assert!(!active.state.halted.load(Ordering::SeqCst));
// Verify that the special system tx is set.
assert!(active.state.change_epoch_tx.load().is_some());
.unwrap();
let response = active
.state
.handle_transaction_info_request(tx_digest.into())
.await
.unwrap();
assert!(response.signed_effects.is_some());
assert!(response.certified_transaction.is_some());
assert!(response.signed_effects.is_some());
}
}
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ async fn test_genesis_sui_sysmtem_state_object() {
async fn test_change_epoch_transaction() {
let authority_state = init_state().await;
let signed_tx = SignedTransaction::new_change_epoch(
0,
1,
100,
100,
authority_state.name,
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,14 +608,14 @@ impl SignedTransaction {
}

pub fn new_change_epoch(
cur_epoch: EpochId,
next_epoch: EpochId,
storage_charge: u64,
computation_charge: u64,
authority: AuthorityName,
secret: &dyn signature::Signer<AuthoritySignature>,
) -> Self {
let kind = TransactionKind::Single(SingleTransactionKind::ChangeEpoch(ChangeEpoch {
epoch: cur_epoch + 1,
epoch: next_epoch,
storage_charge,
computation_charge,
}));
Expand All @@ -633,7 +633,7 @@ impl SignedTransaction {
data,
tx_signature: Signature::new_empty(),
auth_sign_info: AuthoritySignInfo {
epoch: cur_epoch,
epoch: next_epoch,
authority,
signature,
},
Expand Down

0 comments on commit 663b7ae

Please sign in to comment.