Skip to content

Commit

Permalink
Put epochs to an independent store (MystenLabs#3813)
Browse files Browse the repository at this point in the history
* Put EpochesStore in a separate store

* Adjust benchmark code

* warnings
  • Loading branch information
lxfind authored Aug 8, 2022
1 parent 30097e7 commit 4c5877d
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 361 deletions.
17 changes: 9 additions & 8 deletions crates/sui-benchmark/src/benchmark/validator_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ use multiaddr::Multiaddr;
use rocksdb::Options;
use std::{
env, fs, panic,
path::{Path, PathBuf},
path::PathBuf,
process::{Child, Command},
sync::Arc,
thread,
thread::sleep,
time::Duration,
};
use sui_core::authority::*;
use sui_core::epoch::epoch_store::EpochStore;
use sui_types::{
base_types::{SuiAddress, *},
committee::*,
Expand Down Expand Up @@ -98,10 +99,7 @@ impl ValidatorPreparer {
let validator_config = &network_config.validator_configs()[0];
let committee = network_config.committee();

// Create a random directory to store the DB
let path = env::temp_dir().join(format!("DB_{:?}", ObjectID::random()));
let auth_state = make_authority_state(
&path,
db_cpus as i32,
&committee,
&validator_config.public_key(),
Expand Down Expand Up @@ -257,14 +255,15 @@ pub fn get_multithread_runtime() -> Runtime {
}

fn make_authority_state(
store_path: &Path,
db_cpus: i32,
committee: &Committee,
pubx: &AuthorityPublicKeyBytes,
secx: AuthorityKeyPair,
) -> (AuthorityState, Arc<AuthorityStore>) {
fs::create_dir(&store_path).unwrap();
info!("Open database on path: {:?}", store_path.as_os_str());
// Create a random directory to store the DB
let path = env::temp_dir().join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();
info!("Open database on path: {:?}", path.as_os_str());

let mut opts = Options::default();
opts.increase_parallelism(db_cpus);
Expand All @@ -279,14 +278,16 @@ fn make_authority_state(
// manually.
// opts.set_manual_wal_flush(true);

let store = Arc::new(AuthorityStore::open(store_path, Some(opts)));
let store = Arc::new(AuthorityStore::open(&path.join("store"), Some(opts)));
let epoch_store = Arc::new(EpochStore::new(path.join("epochs")));
(
Runtime::new().unwrap().block_on(async {
AuthorityState::new(
committee.clone(),
*pubx,
Arc::pin(secx),
store.clone(),
epoch_store,
None,
None,
None,
Expand Down
99 changes: 89 additions & 10 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::checkpoints::FragmentInternalError;
use crate::checkpoints::{ConsensusSender, FragmentInternalError};
use crate::{
authority_batch::{BroadcastReceiver, BroadcastSender},
checkpoints::CheckpointStore,
Expand All @@ -17,6 +17,7 @@ use chrono::prelude::*;
use move_bytecode_utils::module_cache::SyncModuleCache;
use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable};
use narwhal_crypto::traits::KeyPair;
use narwhal_executor::ExecutionStateError;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use parking_lot::Mutex;
Expand All @@ -25,6 +26,7 @@ use prometheus::{
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};
use std::ops::Deref;
use std::path::PathBuf;
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
Expand Down Expand Up @@ -79,10 +81,12 @@ pub use sui_adapter::temporary_store::TemporaryStore;
pub mod authority_store_tables;

mod authority_store;
use crate::epoch::epoch_store::EpochStore;
pub use authority_store::{
AuthorityStore, GatewayStore, ResolverWrapper, SuiDataStore, UpdateType,
};
use sui_types::committee::EpochId;
use sui_types::crypto::AuthorityKeyPair;
use sui_types::messages_checkpoint::{
CheckpointRequest, CheckpointRequestType, CheckpointResponse, CheckpointSequenceNumber,
};
Expand Down Expand Up @@ -300,6 +304,8 @@ pub struct AuthorityState {
/// The checkpoint store
pub checkpoints: Option<Arc<Mutex<CheckpointStore>>>,

pub(crate) epoch_store: Arc<EpochStore>,

// Structures needed for handling batching and notifications.
/// The sender to notify of new transactions
/// and create batches for this authority.
Expand Down Expand Up @@ -1001,8 +1007,8 @@ impl AuthorityState {

pub fn handle_epoch_request(&self, request: &EpochRequest) -> SuiResult<EpochResponse> {
let epoch_info = match &request.epoch_id {
Some(id) => self.database.get_authenticated_epoch(id)?,
None => Some(self.database.get_latest_authenticated_epoch()),
Some(id) => self.epoch_store.get_authenticated_epoch(id)?,
None => Some(self.epoch_store.get_latest_authenticated_epoch()),
};
Ok(EpochResponse { epoch_info })
}
Expand All @@ -1014,6 +1020,7 @@ impl AuthorityState {
name: AuthorityName,
secret: StableSyncAuthoritySigner,
store: Arc<AuthorityStore>,
epoch_store: Arc<EpochStore>,
indexes: Option<Arc<IndexStore>>,
event_store: Option<Arc<EventStoreType>>,
checkpoints: Option<Arc<Mutex<CheckpointStore>>>,
Expand All @@ -1027,22 +1034,24 @@ impl AuthorityState {
adapter::new_move_vm(native_functions.clone())
.expect("We defined natives to not fail here"),
);
// TODO: update this function to not take genesis, committee if store already exists
// Only initialize an empty database.
let committee = if store
if store
.database_is_empty()
.expect("Database read should not fail.")
{
store
.bulk_object_insert(&genesis.objects().iter().collect::<Vec<_>>())
.await
.expect("Cannot bulk insert genesis objects");
store
}

let committee = if epoch_store.database_is_empty() {
epoch_store
.init_genesis_epoch(genesis_committee.clone())
.expect("Init genesis epoch data must not fail");
genesis_committee
} else {
store
epoch_store
.get_latest_authenticated_epoch()
.epoch_info()
.committee()
Expand All @@ -1065,6 +1074,7 @@ impl AuthorityState {
module_cache: SyncModuleCache::new(ResolverWrapper(store.clone())),
event_handler,
checkpoints,
epoch_store,
batch_channels: tx,
batch_notifier: Arc::new(
authority_notifier::TransactionNotifier::new(store.clone())
Expand Down Expand Up @@ -1123,6 +1133,61 @@ impl AuthorityState {
state
}

pub async fn new_for_testing(
genesis_committee: Committee,
key: &AuthorityKeyPair,
store_base_path: Option<PathBuf>,
genesis: Option<&Genesis>,
consensus_sender: Option<Box<dyn ConsensusSender>>,
) -> Self {
let secret = Arc::pin(key.copy());
let path = match store_base_path {
Some(path) => path,
None => {
let dir = std::env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
std::fs::create_dir(&path).unwrap();
path
}
};
let default_genesis = Genesis::get_default_genesis();
let genesis = match genesis {
Some(genesis) => genesis,
None => &default_genesis,
};

let store = Arc::new(AuthorityStore::open(&path.join("store"), None));
let mut checkpoints = CheckpointStore::open(
&path.join("checkpoints"),
None,
genesis_committee.epoch,
secret.public().into(),
secret.clone(),
)
.expect("Should not fail to open local checkpoint DB");
if let Some(consensus_sender) = consensus_sender {
checkpoints
.set_consensus(consensus_sender)
.expect("No issues");
}

let epochs = Arc::new(EpochStore::new(path.join("epochs")));

AuthorityState::new(
genesis_committee.clone(),
secret.public().into(),
secret.clone(),
store,
epochs,
None,
None,
Some(Arc::new(Mutex::new(checkpoints))),
genesis,
&prometheus::Registry::new(),
)
.await
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
pub async fn process_tx_recovery_log(&self, limit: Option<usize>) -> SuiResult {
let mut limit = limit.unwrap_or(usize::max_value());
Expand Down Expand Up @@ -1176,19 +1241,33 @@ impl AuthorityState {
self.epoch() + 1 == new_committee.epoch,
SuiError::from("Invalid new epoch to sign and update")
);
self.database.sign_new_epoch(
let cur_epoch = new_committee.epoch;
let latest_epoch = self.epoch_store.get_latest_authenticated_epoch();
fp_ensure!(
latest_epoch.epoch() + 1 == cur_epoch,
SuiError::from("Unexpected new epoch number")
);

let signed_epoch = SignedEpoch::new(
new_committee.clone(),
self.name,
&*self.secret,
next_checkpoint,
)?;
latest_epoch.epoch_info(),
);
self.epoch_store
.epochs
.insert(&cur_epoch, &AuthenticatedEpoch::Signed(signed_epoch))?;
// TODO: Do we want to make it possible to subscribe to committee changes?
self.committee.swap(Arc::new(new_committee));
Ok(())
}

pub(crate) fn promote_signed_epoch_to_cert(&self, cert: CertifiedEpoch) -> SuiResult {
self.database.store_epoch_cert(cert)
Ok(self.epoch_store.epochs.insert(
&cert.epoch_info.epoch(),
&AuthenticatedEpoch::Certified(cert),
)?)
}

#[cfg(test)]
Expand Down
67 changes: 0 additions & 67 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use sui_storage::{
};
use sui_types::batch::{SignedBatch, TxSequenceNumber};
use sui_types::crypto::{AuthoritySignInfo, EmptySignInfo};
use sui_types::messages::AuthenticatedEpoch;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::object::{Owner, OBJECT_START_VERSION};
use sui_types::{base_types::SequenceNumber, storage::ParentSync};
use tokio::sync::Notify;
Expand Down Expand Up @@ -1320,71 +1318,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.expect("Sui System State object deserialization cannot fail");
Ok(result)
}

// Epoch related functions

pub fn init_genesis_epoch(&self, genesis_committee: Committee) -> SuiResult {
assert_eq!(genesis_committee.epoch, 0);
let epoch_data = AuthenticatedEpoch::Genesis(GenesisEpoch::new(genesis_committee));
self.tables.epochs.insert(&0, &epoch_data)?;
Ok(())
}

/// This function should be called at the end of the epoch identified by `epoch`,
/// and after this call, we expect that the node's committee has changed
/// to `new_committee`.
pub fn sign_new_epoch(
&self,
new_committee: Committee,
authority: AuthorityName,
secret: &dyn signature::Signer<AuthoritySignature>,
next_checkpoint: CheckpointSequenceNumber,
) -> SuiResult {
let cur_epoch = new_committee.epoch;
let latest_epoch = self.get_latest_authenticated_epoch();
fp_ensure!(
latest_epoch.epoch() + 1 == cur_epoch,
SuiError::from("Unexpected new epoch number")
);

let signed_epoch = SignedEpoch::new(
new_committee,
authority,
secret,
next_checkpoint,
latest_epoch.epoch_info(),
);
self.tables
.epochs
.insert(&cur_epoch, &AuthenticatedEpoch::Signed(signed_epoch))?;
Ok(())
}

pub fn store_epoch_cert(&self, cert: CertifiedEpoch) -> SuiResult {
Ok(self.tables.epochs.insert(
&cert.epoch_info.epoch(),
&AuthenticatedEpoch::Certified(cert),
)?)
}

pub fn get_authenticated_epoch(
&self,
epoch_id: &EpochId,
) -> SuiResult<Option<AuthenticatedEpoch>> {
Ok(self.tables.epochs.get(epoch_id)?)
}

pub fn get_latest_authenticated_epoch(&self) -> AuthenticatedEpoch {
self.tables
.epochs
.iter()
.skip_to_last()
.next()
// unwrap safe because we guarantee there is at least a genesis epoch
// when initializing the store.
.unwrap()
.1
}
}

impl SuiDataStore<AuthoritySignInfo> {
Expand Down
5 changes: 0 additions & 5 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,4 @@ pub struct AuthorityStoreTables<S> {
/// by a single process acting as consensus (light) client. It is used to ensure the authority processes
/// every message output by consensus (and in the right order).
pub(crate) last_consensus_index: DBMap<u64, ExecutionIndices>,

/// Map from each epoch ID to the epoch information. The epoch is either signed by this node,
/// or is certified (signed by a quorum).
#[options(optimization = "point_lookup")]
pub(crate) epochs: DBMap<EpochId, AuthenticatedEpoch>,
}
Loading

0 comments on commit 4c5877d

Please sign in to comment.