Skip to content

Commit

Permalink
Epoch reconfiguration library (MystenLabs#2302)
Browse files Browse the repository at this point in the history
* [epoch] Add reconfiguration logic

* Add tests

* Define StakeUnit

* Don't update active authority's committee yet

* Do not change mutability of checkpoint store functions

* Address comments

* rebase
  • Loading branch information
lxfind authored Jun 1, 2022
1 parent ff102aa commit 3acaf2a
Show file tree
Hide file tree
Showing 37 changed files with 876 additions and 302 deletions.
3 changes: 2 additions & 1 deletion crates/sui-config/src/genesis_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use sui_types::base_types::{ObjectID, SuiAddress, TxContext};
use sui_types::committee::StakeUnit;
use sui_types::crypto::{get_key_pair_from_rng, KeyPair};
use sui_types::object::Object;
use tracing::info;
Expand Down Expand Up @@ -122,7 +123,7 @@ impl GenesisConfig {
pub struct ValidatorGenesisInfo {
pub key_pair: KeyPair,
pub network_address: Multiaddr,
pub stake: usize,
pub stake: StakeUnit,
pub narwhal_primary_to_primary: Multiaddr,
pub narwhal_worker_to_primary: Multiaddr,
pub narwhal_primary_to_worker: Multiaddr,
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fs;
use std::path::{Path, PathBuf};
use sui_types::committee::StakeUnit;
use tracing::trace;

pub mod builder;
Expand All @@ -32,7 +33,7 @@ pub const AUTHORITIES_DB_NAME: &str = "authorities_db";
pub const CONSENSUS_DB_NAME: &str = "consensus_db";
pub const FULL_NODE_DB_PATH: &str = "full_node_db";

const DEFAULT_STAKE: usize = 1;
const DEFAULT_STAKE: StakeUnit = 1;

pub fn sui_config_dir() -> Result<PathBuf, anyhow::Error> {
match std::env::var_os("SUI_CONFIG_DIR") {
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use sui_types::base_types::SuiAddress;
use sui_types::committee::StakeUnit;
use sui_types::crypto::{KeyPair, PublicKeyBytes};

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -123,7 +124,7 @@ impl ConsensusConfig {
#[serde(rename_all = "kebab-case")]
pub struct ValidatorInfo {
pub public_key: PublicKeyBytes,
pub stake: usize,
pub stake: StakeUnit,
pub network_address: Multiaddr,
}

Expand All @@ -136,7 +137,7 @@ impl ValidatorInfo {
self.public_key
}

pub fn stake(&self) -> usize {
pub fn stake(&self) -> StakeUnit {
self.stake
}

Expand Down
104 changes: 91 additions & 13 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
transaction_input_checker,
};
use anyhow::anyhow;
use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use itertools::Itertools;
use move_binary_format::CompiledModule;
Expand All @@ -30,6 +31,7 @@ use parking_lot::Mutex;
use prometheus_exporter::prometheus::{
register_histogram, register_int_counter, Histogram, IntCounter,
};
use std::ops::Deref;
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
pin::Pin,
Expand All @@ -52,7 +54,7 @@ use sui_types::{
messages::*,
object::{Data, Object, ObjectFormatOptions, ObjectRead},
storage::{BackingPackageStore, DeleteKind, Storage},
MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS,
MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, SUI_SYSTEM_STATE_OBJECT_ID,
};
use tracing::{debug, error, instrument};
use typed_store::Map;
Expand All @@ -79,6 +81,7 @@ pub use temporary_store::AuthorityTemporaryStore;
mod authority_store;
pub use authority_store::{AuthorityStore, GatewayStore, ReplicaStore, SuiDataStore};
use sui_types::object::Owner;
use sui_types::sui_system_state::SuiSystemState;

use self::authority_store::{
generate_genesis_system_object, store_package_and_init_modules_for_genesis,
Expand Down Expand Up @@ -219,14 +222,15 @@ pub struct AuthorityState {
pub secret: StableSyncAuthoritySigner,

/// Committee of this Sui instance.
pub committee: Committee,
pub committee: ArcSwap<Committee>,
/// A global lock to halt all transaction/cert processing.
#[allow(dead_code)]
halted: AtomicBool,
pub(crate) halted: AtomicBool,
pub(crate) change_epoch_tx: ArcSwapOption<SignedTransaction>,

/// Move native functions that are available to invoke
_native_functions: NativeFunctionTable,
move_vm: Arc<MoveVM>,
pub(crate) _native_functions: NativeFunctionTable,
pub(crate) move_vm: Arc<MoveVM>,

/// The database
pub(crate) database: Arc<AuthorityStore>, // TODO: remove pub
Expand Down Expand Up @@ -281,6 +285,11 @@ impl AuthorityState {
return Ok(transaction_info);
}

if self.halted.load(Ordering::SeqCst) {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

let (_gas_status, all_objects) = transaction_input_checker::check_transaction_input(
&self.database,
&transaction,
Expand All @@ -290,8 +299,12 @@ impl AuthorityState {

let owned_objects = transaction_input_checker::filter_owned_objects(&all_objects);

let signed_transaction =
SignedTransaction::new(self.committee.epoch, transaction, self.name, &*self.secret);
let signed_transaction = SignedTransaction::new(
self.committee.load().epoch,
transaction,
self.name,
&*self.secret,
);

// Check and write locks, to signed transaction, into the database
// The call to self.set_transaction_lock checks the lock is not conflicting,
Expand Down Expand Up @@ -348,9 +361,18 @@ impl AuthorityState {
return Ok(info);
}

if self.halted.load(Ordering::SeqCst) {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

// Check the certificate and retrieve the transfer data.
tracing::trace_span!("cert_check_signature")
.in_scope(|| confirmation_transaction.certificate.verify(&self.committee))
.in_scope(|| {
confirmation_transaction
.certificate
.verify(&self.committee.load())
})
.map_err(|e| {
self.metrics.signature_errors.inc();
e
Expand Down Expand Up @@ -481,7 +503,7 @@ impl AuthorityState {
&self.move_vm,
&self._native_functions,
gas_status,
self.committee.epoch,
self.committee.load().epoch,
)?;

self.metrics.total_effects.inc();
Expand All @@ -491,7 +513,7 @@ impl AuthorityState {

// TODO: Distribute gas charge and rebate, which can be retrieved from effects.
let signed_effects =
effects.to_sign_effects(self.committee.epoch, &self.name, &*self.secret);
effects.to_sign_effects(self.committee.load().epoch, &self.name, &*self.secret);

// Update the database in an atomic manner
self.update_state(temporary_store, &certificate, &signed_effects)
Expand Down Expand Up @@ -762,8 +784,9 @@ impl AuthorityState {
let mut state = AuthorityState {
name,
secret,
committee: current_epoch_info.committee,
committee: ArcSwap::from(Arc::new(current_epoch_info.committee)),
halted: AtomicBool::new(current_epoch_info.validator_halted),
change_epoch_tx: ArcSwapOption::empty(),
_native_functions: native_functions,
move_vm,
database: store.clone(),
Expand Down Expand Up @@ -817,6 +840,37 @@ impl AuthorityState {
state
}

pub(crate) fn insert_new_epoch_info(&self, new_committee: &Committee) -> SuiResult {
let current_epoch_info = self.database.get_last_epoch_info()?;
fp_ensure!(
current_epoch_info.committee.epoch <= new_committee.epoch,
SuiError::InconsistentEpochState {
error: "Trying to insert an old epoch entry".to_owned()
}
);
self.database.insert_new_epoch_info(EpochInfoLocals {
committee: new_committee.clone(),
validator_halted: true,
})?;
self.committee.store(Arc::new(new_committee.clone()));
Ok(())
}

pub(crate) fn begin_new_epoch(&self) -> SuiResult {
let epoch_info = self.database.get_last_epoch_info()?;
assert_eq!(
&epoch_info.committee,
self.committee.load().clone().deref(),
"About to being new epoch, however current committee differs from epoch store"
);
self.database.insert_new_epoch_info(EpochInfoLocals {
committee: self.clone_committee(),
validator_halted: false,
})?;
self.halted.store(false, Ordering::SeqCst);
Ok(())
}

pub(crate) fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
self.checkpoints.clone()
}
Expand All @@ -825,6 +879,10 @@ impl AuthorityState {
self.database.clone()
}

pub fn clone_committee(&self) -> Committee {
self.committee.load().clone().deref().clone()
}

async fn get_object(&self, object_id: &ObjectID) -> Result<Option<Object>, SuiError> {
self.database.get_object(object_id)
}
Expand All @@ -837,6 +895,20 @@ impl AuthorityState {
.compute_object_reference())
}

pub async fn get_sui_system_state_object(&self) -> SuiResult<SuiSystemState> {
let sui_system_object = self
.get_object(&SUI_SYSTEM_STATE_OBJECT_ID)
.await?
.expect("Sui System State object must always exist");
let move_object = sui_system_object
.data
.try_as_move()
.expect("Sui System State object must be a Move object");
let result = bcs::from_bytes::<SuiSystemState>(move_object.contents())
.expect("Sui System State object deserialization cannot fail");
Ok(result)
}

pub async fn get_object_read(&self, object_id: &ObjectID) -> Result<ObjectRead, SuiError> {
match self.database.get_latest_parent_entry(*object_id)? {
None => Ok(ObjectRead::NotExists(*object_id)),
Expand Down Expand Up @@ -1025,12 +1097,18 @@ impl AuthorityState {
/// Update state and signals that a new transactions has been processed
/// to the batch maker service.
#[instrument(name = "db_update_state", level = "debug", skip_all)]
async fn update_state(
pub(crate) async fn update_state(
&self,
temporary_store: AuthorityTemporaryStore<AuthorityStore>,
certificate: &CertifiedTransaction,
signed_effects: &SignedTransactionEffects,
) -> SuiResult {
if self.halted.load(Ordering::SeqCst) {
// TODO: Here we should allow consensus transaction to continue.
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

let notifier_ticket = self.batch_notifier.ticket()?;
let seq = notifier_ticket.seq();

Expand Down Expand Up @@ -1157,7 +1235,7 @@ impl ExecutionState for AuthorityState {
if !self.shared_locks_exist(&certificate).await? {
// Check the certificate. Remember that Byzantine authorities may input anything into
// consensus.
certificate.verify(&self.committee)?;
certificate.verify(&self.committee.load())?;

// Persist the certificate since we are about to lock one or more shared object.
// We thus need to make sure someone (if not the client) can continue the protocol.
Expand Down
9 changes: 7 additions & 2 deletions crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ impl TransactionNotifier {
}

pub fn low_watermark(&self) -> TxSequenceNumber {
self.low_watermark.load(std::sync::atomic::Ordering::SeqCst)
self.low_watermark.load(Ordering::SeqCst)
}

// TODO: Return a future instead so that the caller can await for.
pub fn ticket_drained(&self) -> bool {
self.inner.lock().high_watermark == self.low_watermark.load(Ordering::SeqCst)
}

/// Get a ticket with a sequence number
pub fn ticket(self: &Arc<Self>) -> SuiResult<TransactionNotifierTicket> {
if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) {
if self.is_closed.load(Ordering::SeqCst) {
return Err(SuiError::ClosedNotifierError);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<A> ActiveAuthority<A> {
authority: Arc<AuthorityState>,
authority_clients: BTreeMap<AuthorityName, A>,
) -> SuiResult<Self> {
let committee = authority.committee.clone();
let committee = authority.clone_committee();

Ok(ActiveAuthority {
health: Arc::new(Mutex::new(
Expand Down
Loading

0 comments on commit 3acaf2a

Please sign in to comment.