From 663b7ae9bc4e9bb87750d09a156529b9b096d738 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 6 Jun 2022 10:32:55 -0700 Subject: [PATCH] Collect change epoch system transaction cert and execute (#2395) * Collect change epoch cert * Address feedback --- crates/sui-core/src/authority.rs | 28 +++-- crates/sui-core/src/epoch/reconfiguration.rs | 59 +++++++-- .../src/epoch/tests/reconfiguration_tests.rs | 116 +++++++++++------- .../src/unit_tests/authority_tests.rs | 2 +- crates/sui-types/src/messages.rs | 6 +- 5 files changed, 142 insertions(+), 69 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 0ac0f40bd4863..bb2863c11561d 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -13,7 +13,7 @@ use crate::{ transaction_input_checker, }; use anyhow::anyhow; -use arc_swap::{ArcSwap, ArcSwapOption}; +use arc_swap::ArcSwap; use async_trait::async_trait; use itertools::Itertools; use move_binary_format::CompiledModule; @@ -232,7 +232,6 @@ pub struct AuthorityState { /// A global lock to halt all transaction/cert processing. #[allow(dead_code)] pub(crate) halted: AtomicBool, - pub(crate) change_epoch_tx: ArcSwapOption, /// Move native functions that are available to invoke pub(crate) _native_functions: NativeFunctionTable, @@ -281,12 +280,6 @@ impl AuthorityState { &self, transaction: Transaction, ) -> Result { - // Validators should never sign an external system transaction. - fp_ensure!( - !transaction.data.kind.is_system_tx(), - SuiError::InvalidSystemTransaction - ); - let transaction_digest = *transaction.digest(); // Ensure an idempotent answer. if self.database.transaction_exists(&transaction_digest)? { @@ -295,6 +288,12 @@ impl AuthorityState { return Ok(transaction_info); } + // Validators should never sign an external system transaction. + fp_ensure!( + !transaction.data.kind.is_system_tx(), + SuiError::InvalidSystemTransaction + ); + if self.halted.load(Ordering::SeqCst) { // TODO: Do we want to include the new validator set? return Err(SuiError::ValidatorHaltedAtEpochEnd); @@ -371,7 +370,13 @@ impl AuthorityState { return Ok(info); } - if self.halted.load(Ordering::SeqCst) { + if self.halted.load(Ordering::SeqCst) + && !confirmation_transaction + .certificate + .data + .kind + .is_system_tx() + { // TODO: Do we want to include the new validator set? return Err(SuiError::ValidatorHaltedAtEpochEnd); } @@ -825,7 +830,6 @@ impl AuthorityState { secret, committee: ArcSwap::from(Arc::new(current_epoch_info.committee)), halted: AtomicBool::new(current_epoch_info.validator_halted), - change_epoch_tx: ArcSwapOption::empty(), _native_functions: native_functions, move_vm, database: store.clone(), @@ -903,7 +907,7 @@ impl AuthorityState { Ok(()) } - pub(crate) fn begin_new_epoch(&self) -> SuiResult { + pub(crate) fn unhalt_validator(&self) -> SuiResult { let epoch_info = self.database.get_last_epoch_info()?; assert_eq!( &epoch_info.committee, @@ -1145,7 +1149,7 @@ impl AuthorityState { certificate: &CertifiedTransaction, signed_effects: &SignedTransactionEffects, ) -> SuiResult { - if self.halted.load(Ordering::SeqCst) { + if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() { // TODO: Here we should allow consensus transaction to continue. // TODO: Do we want to include the new validator set? return Err(SuiError::ValidatorHaltedAtEpochEnd); diff --git a/crates/sui-core/src/epoch/reconfiguration.rs b/crates/sui-core/src/epoch/reconfiguration.rs index 3cd67dd736e9b..b34c73bd79fbf 100644 --- a/crates/sui-core/src/epoch/reconfiguration.rs +++ b/crates/sui-core/src/epoch/reconfiguration.rs @@ -2,20 +2,30 @@ // SPDX-License-Identifier: Apache-2.0 use crate::authority_active::ActiveAuthority; +use crate::authority_aggregator::AuthorityAggregator; +use crate::authority_client::AuthorityAPI; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use sui_types::committee::Committee; use sui_types::crypto::PublicKeyBytes; use sui_types::error::SuiResult; -use sui_types::messages::SignedTransaction; +use sui_types::messages::{ConfirmationTransaction, SignedTransaction}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use typed_store::Map; // TODO: Make last checkpoint number of each epoch more flexible. pub const CHECKPOINT_COUNT_PER_EPOCH: u64 = 200; -impl ActiveAuthority { +const WAIT_BETWEEN_EPOCH_TX_QUERY_RETRY: Duration = Duration::from_millis(300); + +impl ActiveAuthority +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + /// This function should be called by the active checkpoint process, when it finishes processing + /// all transactions from the second to the least checkpoint of the epoch. It's called by a + /// validator that belongs to the committee of the current epoch. pub async fn start_epoch_change(&self) -> SuiResult { if let Some(checkpoints) = &self.state.checkpoints { let mut checkpoints = checkpoints.lock(); @@ -41,6 +51,9 @@ impl ActiveAuthority { Ok(()) } + /// This function should be called by the active checkpoint process, when it finishes processing + /// all transactions from the last checkpoint of the epoch. This function needs to be called by + /// a validator that belongs to the committee of the next epoch. pub async fn finish_epoch_change(&self) -> SuiResult { assert!( self.state.halted.load(Ordering::SeqCst), @@ -81,7 +94,14 @@ impl ActiveAuthority { .collect(); let new_committee = Committee::new(next_epoch, votes); self.state.insert_new_epoch_info(&new_committee)?; - //self.state.checkpoints.as_ref().unwrap().lock().committee = new_committee; + let new_net = Arc::new(AuthorityAggregator::new( + new_committee, + self.net.load().clone_inner_clients(), + )); + self.net.store(new_net.clone()); + // TODO: Also reconnect network if changed. + // This is blocked for now since we are not storing network info on-chain yet. + // TODO: Update all committee in all components safely, // potentially restart some authority clients. // Including: self.net, narwhal committee/consensus adapter, @@ -95,22 +115,41 @@ impl ActiveAuthority { self.state.name, &*self.state.secret, ); + // Add the signed transaction to the store. self.state - .change_epoch_tx - .store(Some(Arc::new(advance_epoch_tx))); + .set_transaction_lock(&[], advance_epoch_tx.clone()) + .await?; + + // Collect a certificate for this system transaction that changes epoch, + // and execute it locally. + loop { + if let Ok(certificate) = new_net + .process_transaction( + advance_epoch_tx.clone().to_transaction(), + Duration::from_secs(0), + ) + .await + { + self.state + .handle_confirmation_transaction(ConfirmationTransaction { certificate }) + .await + .expect("Executing the special cert cannot fail"); + break; + } - // TODO: Now ask every validator in the committee for this signed tx. - // Aggregate them to obtain a cert, execute the cert, and then start the new epoch. + tokio::time::sleep(WAIT_BETWEEN_EPOCH_TX_QUERY_RETRY).await; + } - self.state.begin_new_epoch()?; + // Resume the validator to start accepting transactions for the new epoch. + self.state.unhalt_validator()?; Ok(()) } - fn is_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool { + pub fn is_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool { checkpoint > 0 && checkpoint % CHECKPOINT_COUNT_PER_EPOCH == 0 } - fn is_second_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool { + pub fn is_second_last_checkpoint_epoch(checkpoint: CheckpointSequenceNumber) -> bool { (checkpoint + 1) % CHECKPOINT_COUNT_PER_EPOCH == 0 } } diff --git a/crates/sui-core/src/epoch/tests/reconfiguration_tests.rs b/crates/sui-core/src/epoch/tests/reconfiguration_tests.rs index d8f8326ef31f4..571ac847355fb 100644 --- a/crates/sui-core/src/epoch/tests/reconfiguration_tests.rs +++ b/crates/sui-core/src/epoch/tests/reconfiguration_tests.rs @@ -17,6 +17,7 @@ use sui_types::{ gas::SuiGasStatus, messages::{ConfirmationTransaction, SignatureAggregator, Transaction, TransactionData}, object::Object, + SUI_SYSTEM_STATE_OBJECT_ID, }; use crate::{ @@ -57,7 +58,7 @@ async fn test_start_epoch_change() { }) .unwrap(); // Create an active authority for the first authority state. - let active = ActiveAuthority::new(state.clone(), net.authority_clients).unwrap(); + let active = ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap(); // Make the high watermark differ from low watermark. let ticket = state.batch_notifier.ticket().unwrap(); @@ -167,50 +168,79 @@ async fn test_finish_epoch_change() { genesis_objects.clone(), ]) .await; - let state = states[0].clone(); - // Set the checkpoint number to be near the end of epoch. - let mut locals = CheckpointLocals { - next_checkpoint: CHECKPOINT_COUNT_PER_EPOCH - 1, - proposal_next_transaction: None, - next_transaction_sequence: 0, - no_more_fragments: true, - current_proposal: None, - }; - state - .checkpoints - .as_ref() - .unwrap() - .lock() - .set_locals_for_testing(locals.clone()) - .unwrap(); - // Create an active authority for the first authority state. - let active = ActiveAuthority::new(state.clone(), net.authority_clients).unwrap(); + let actives: Vec<_> = states + .iter() + .map(|state| ActiveAuthority::new(state.clone(), net.clone_inner_clients()).unwrap()) + .collect(); + let results: Vec<_> = states + .iter() + .zip(actives.iter()) + .map(|(state, active)| { + async { + // Set the checkpoint number to be near the end of epoch. + let mut locals = CheckpointLocals { + next_checkpoint: CHECKPOINT_COUNT_PER_EPOCH - 1, + proposal_next_transaction: None, + next_transaction_sequence: 0, + no_more_fragments: true, + current_proposal: None, + }; + state + .checkpoints + .as_ref() + .unwrap() + .lock() + .set_locals_for_testing(locals.clone()) + .unwrap(); - active.start_epoch_change().await.unwrap(); + active.start_epoch_change().await.unwrap(); - locals.next_checkpoint += 1; - state - .checkpoints - .as_ref() - .unwrap() - .lock() - .set_locals_for_testing(locals.clone()) - .unwrap(); - active.finish_epoch_change().await.unwrap(); - // Verify that epoch changed in authority state. - assert_eq!(active.state.committee.load().epoch, 1); - assert_eq!( - active + locals.next_checkpoint += 1; + state + .checkpoints + .as_ref() + .unwrap() + .lock() + .set_locals_for_testing(locals.clone()) + .unwrap(); + + active.finish_epoch_change().await.unwrap() + } + }) + .collect(); + futures::future::join_all(results).await; + + // Verify that epoch changed in every authority state. + for active in actives { + assert_eq!(active.state.committee.load().epoch, 1); + assert_eq!(active.net.load().committee.epoch, 1); + assert_eq!( + active + .state + .db() + .get_last_epoch_info() + .unwrap() + .committee + .epoch, + 1 + ); + // Verify that validator is no longer halted. + assert!(!active.state.halted.load(Ordering::SeqCst)); + let system_state = active.state.get_sui_system_state_object().await.unwrap(); + assert_eq!(system_state.epoch, 1); + let (_, tx_digest) = active .state - .db() - .get_last_epoch_info() + .get_latest_parent_entry(SUI_SYSTEM_STATE_OBJECT_ID) + .await .unwrap() - .committee - .epoch, - 1 - ); - // Verify that validator is no longer halted. - assert!(!active.state.halted.load(Ordering::SeqCst)); - // Verify that the special system tx is set. - assert!(active.state.change_epoch_tx.load().is_some()); + .unwrap(); + let response = active + .state + .handle_transaction_info_request(tx_digest.into()) + .await + .unwrap(); + assert!(response.signed_effects.is_some()); + assert!(response.certified_transaction.is_some()); + assert!(response.signed_effects.is_some()); + } } diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index 21a31f61dd2c3..de449a5df9624 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -1260,7 +1260,7 @@ async fn test_genesis_sui_sysmtem_state_object() { async fn test_change_epoch_transaction() { let authority_state = init_state().await; let signed_tx = SignedTransaction::new_change_epoch( - 0, + 1, 100, 100, authority_state.name, diff --git a/crates/sui-types/src/messages.rs b/crates/sui-types/src/messages.rs index f39e3bd1d78c9..f6294e5177f19 100644 --- a/crates/sui-types/src/messages.rs +++ b/crates/sui-types/src/messages.rs @@ -608,14 +608,14 @@ impl SignedTransaction { } pub fn new_change_epoch( - cur_epoch: EpochId, + next_epoch: EpochId, storage_charge: u64, computation_charge: u64, authority: AuthorityName, secret: &dyn signature::Signer, ) -> Self { let kind = TransactionKind::Single(SingleTransactionKind::ChangeEpoch(ChangeEpoch { - epoch: cur_epoch + 1, + epoch: next_epoch, storage_charge, computation_charge, })); @@ -633,7 +633,7 @@ impl SignedTransaction { data, tx_signature: Signature::new_empty(), auth_sign_info: AuthoritySignInfo { - epoch: cur_epoch, + epoch: next_epoch, authority, signature, },