Skip to content

Commit

Permalink
Add epoch store to SafeClient (MystenLabs#3856)
Browse files Browse the repository at this point in the history
* Add epoch store to SafeClient

* feedback
  • Loading branch information
lxfind authored Aug 12, 2022
1 parent 340f8a4 commit 95796a5
Show file tree
Hide file tree
Showing 22 changed files with 208 additions and 138 deletions.
8 changes: 5 additions & 3 deletions crates/sui-benchmark/src/benchmark/validator_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,14 @@ fn make_authority_state(
// manually.
// opts.set_manual_wal_flush(true);

let store = Arc::new(AuthorityStore::open(&path.join("store"), Some(opts)));
let epoch_store = Arc::new(EpochStore::new(path.join("epochs")));
let store = Arc::new(AuthorityStore::open(
&path.join("store"),
Some(opts.clone()),
));
let epoch_store = Arc::new(EpochStore::new(path.join("epochs"), committee, Some(opts)));
(
Runtime::new().unwrap().block_on(async {
AuthorityState::new(
committee.clone(),
*pubx,
Arc::pin(secx),
store.clone(),
Expand Down
11 changes: 8 additions & 3 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use sui_benchmark::workloads::workload::CombinationWorkload;
use sui_benchmark::workloads::workload::Payload;
use sui_benchmark::workloads::workload::Workload;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_core::epoch::epoch_store::EpochStore;
use sui_quorum_driver::QuorumDriverHandler;
use sui_sdk::crypto::FileBasedKeystore;
use sui_types::crypto::EncodeDecodeBase64;
Expand Down Expand Up @@ -118,7 +119,7 @@ struct Opts {
pub client_metric_port: u16,
/// Number of followers to run. This also stresses the follower logic in validators
#[clap(long, default_value = "0", global = true)]
pub num_folowers: u64,
pub num_followers: u64,
/// Whether or no to download TXes during follow
#[clap(long, global = true)]
pub download_txes: bool,
Expand All @@ -131,7 +132,7 @@ pub enum OptWorkloadSpec {
// Allow the ability to mix shared object and
// single owner transactions in the benchmarking
// framework. Currently, only shared counter
// and transfer obejct transaction types are
// and transfer object transaction types are
// supported but there will be more in future. Also
// there is no dependency between individual
// transactions such that they can all be executed
Expand Down Expand Up @@ -559,7 +560,7 @@ async fn main() -> Result<()> {
let mut follower_handles = vec![];

// Start the followers if any
for idx in 0..opts.num_folowers {
for idx in 0..opts.num_followers {
// Kick off a task which follows all authorities and discards the data
for (name, auth_client) in auth_clients.clone() {
follower_handles.push(tokio::task::spawn(async move {
Expand Down Expand Up @@ -599,8 +600,10 @@ async fn main() -> Result<()> {
let committee = GatewayState::make_committee(&config)?;
let authority_clients = GatewayState::make_authority_clients(&config);
let registry = prometheus::Registry::new();
let epoch_store = Arc::new(EpochStore::new_for_testing(&committee));
let aggregator = AuthorityAggregator::new(
committee,
epoch_store,
authority_clients,
AuthAggMetrics::new(&registry),
SafeClientMetrics::new(&registry),
Expand Down Expand Up @@ -658,8 +661,10 @@ async fn main() -> Result<()> {
.parse()
.unwrap(),
);
let epoch_store = Arc::new(EpochStore::new_for_testing(&committee));
let aggregator = AuthorityAggregator::new(
committee,
epoch_store,
authority_clients,
AuthAggMetrics::new(&registry),
SafeClientMetrics::new(&registry),
Expand Down
32 changes: 16 additions & 16 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub struct AuthorityState {
/// The checkpoint store
pub checkpoints: Option<Arc<Mutex<CheckpointStore>>>,

pub(crate) epoch_store: Arc<EpochStore>,
epoch_store: Arc<EpochStore>,

// Structures needed for handling batching and notifications.
/// The sender to notify of new transactions
Expand Down Expand Up @@ -346,6 +346,10 @@ impl AuthorityState {
self.committee.load().epoch
}

pub fn epoch_store(&self) -> &Arc<EpochStore> {
&self.epoch_store
}

async fn handle_transaction_impl(
&self,
transaction: Transaction,
Expand Down Expand Up @@ -1017,7 +1021,6 @@ impl AuthorityState {
// TODO: This function takes both committee and genesis as parameter.
// Technically genesis already contains committee information. Could consider merging them.
pub async fn new(
genesis_committee: Committee,
name: AuthorityName,
secret: StableSyncAuthoritySigner,
store: Arc<AuthorityStore>,
Expand Down Expand Up @@ -1046,18 +1049,11 @@ impl AuthorityState {
.expect("Cannot bulk insert genesis objects");
}

let committee = if epoch_store.database_is_empty() {
epoch_store
.init_genesis_epoch(genesis_committee.clone())
.expect("Init genesis epoch data must not fail");
genesis_committee
} else {
epoch_store
.get_latest_authenticated_epoch()
.epoch_info()
.committee()
.clone()
};
let committee = epoch_store
.get_latest_authenticated_epoch()
.epoch_info()
.committee()
.clone();

let event_handler = event_store.map(|es| Arc::new(EventHandler::new(store.clone(), es)));

Expand Down Expand Up @@ -1134,6 +1130,7 @@ impl AuthorityState {
state
}

// TODO: Technically genesis_committee can be derived from genesis.
pub async fn new_for_testing(
genesis_committee: Committee,
key: &AuthorityKeyPair,
Expand Down Expand Up @@ -1172,10 +1169,13 @@ impl AuthorityState {
.expect("No issues");
}

let epochs = Arc::new(EpochStore::new(path.join("epochs")));
let epochs = Arc::new(EpochStore::new(
path.join("epochs"),
&genesis_committee,
None,
));

AuthorityState::new(
genesis_committee.clone(),
secret.public().into(),
secret.clone(),
store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::authority::AuthorityState;
use crate::authority_aggregator::authority_aggregator_tests::*;
use crate::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use crate::epoch::epoch_store::EpochStore;
use crate::safe_client::SafeClient;
use async_trait::async_trait;
use std::borrow::Borrow;
Expand Down Expand Up @@ -238,9 +239,10 @@ pub async fn init_configurable_authorities(
}
states.push(client.state.clone());
names.push(authority_name);
let epoch_store = client.state.epoch_store().clone();
clients.push(SafeClient::new(
client,
committee.clone(),
epoch_store,
authority_name,
SafeClientMetrics::new_for_tests(),
));
Expand Down Expand Up @@ -319,8 +321,10 @@ pub async fn init_configurable_authorities(
.into_iter()
.map(|(name, client)| (name, client.authority_client().clone()))
.collect();
let epoch_store = Arc::new(EpochStore::new_for_testing(&committee));
let net = AuthorityAggregator::new(
committee,
epoch_store,
authority_clients,
AuthAggMetrics::new_for_tests(),
SafeClientMetrics::new_for_tests(),
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use prometheus::{
};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::string::ToString;
use std::sync::Arc;
use std::time::Duration;
use sui_types::committee::StakeUnit;
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, timeout};

use crate::epoch::epoch_store::EpochStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::TapFallible;

Expand Down Expand Up @@ -157,12 +159,14 @@ pub struct AuthorityAggregator<A> {
impl<A> AuthorityAggregator<A> {
pub fn new(
committee: Committee,
epoch_store: Arc<EpochStore>,
authority_clients: BTreeMap<AuthorityName, A>,
metrics: AuthAggMetrics,
safe_client_metrics: SafeClientMetrics,
) -> Self {
Self::new_with_timeouts(
committee,
epoch_store,
authority_clients,
metrics,
safe_client_metrics,
Expand All @@ -172,19 +176,25 @@ impl<A> AuthorityAggregator<A> {

pub fn new_with_timeouts(
committee: Committee,
epoch_store: Arc<EpochStore>,
authority_clients: BTreeMap<AuthorityName, A>,
metrics: AuthAggMetrics,
safe_client_metrics: SafeClientMetrics,
timeouts: TimeoutConfig,
) -> Self {
Self {
committee: committee.clone(),
committee,
authority_clients: authority_clients
.into_iter()
.map(|(name, api)| {
(
name,
SafeClient::new(api, committee.clone(), name, safe_client_metrics.clone()),
SafeClient::new(
api,
epoch_store.clone(),
name,
safe_client_metrics.clone(),
),
)
})
.collect(),
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sui_types::{
};

use crate::authority_aggregator::AuthAggMetrics;
use crate::epoch::epoch_store::EpochStore;
use parking_lot::Mutex;

pub struct TestCausalOrderPendCertNoop;
Expand Down Expand Up @@ -1668,6 +1669,7 @@ pub async fn checkpoint_tests_setup(
// Now make an authority aggregator
let aggregator = AuthorityAggregator::new(
committee.clone(),
Arc::new(EpochStore::new_for_testing(&committee)),
authorities
.iter()
.map(|a| {
Expand Down
23 changes: 19 additions & 4 deletions crates/sui-core/src/epoch/epoch_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use rocksdb::Options;
use std::path::PathBuf;
use sui_types::base_types::ObjectID;
use sui_types::committee::{Committee, EpochId};
use sui_types::error::SuiResult;
use sui_types::messages::{AuthenticatedEpoch, GenesisEpoch};
Expand All @@ -19,12 +21,21 @@ pub struct EpochStore {
}

impl EpochStore {
pub fn new(path: PathBuf) -> Self {
Self::open_tables_read_write(path, None)
pub fn new(path: PathBuf, genesis_committee: &Committee, db_options: Option<Options>) -> Self {
let epoch_store = Self::open_tables_read_write(path, db_options);
if epoch_store.database_is_empty() {
epoch_store
.init_genesis_epoch(genesis_committee.clone())
.expect("Init genesis epoch data must not fail");
}
epoch_store
}

pub fn database_is_empty(&self) -> bool {
self.epochs.iter().next().is_none()
pub fn new_for_testing(genesis_committee: &Committee) -> Self {
let dir = std::env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
std::fs::create_dir(&path).unwrap();
Self::new(path, genesis_committee, None)
}

pub fn init_genesis_epoch(&self, genesis_committee: Committee) -> SuiResult {
Expand All @@ -51,4 +62,8 @@ impl EpochStore {
.unwrap()
.1
}

fn database_is_empty(&self) -> bool {
self.epochs.iter().next().is_none()
}
}
38 changes: 15 additions & 23 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,20 @@ where
}

// Reconnect the network if we have an type of AuthorityClient that has a network.
if A::needs_network_recreation() {
self.recreate_network(sui_system_state, new_committee)?;
let new_clients = if A::needs_network_recreation() {
self.recreate_network(sui_system_state)?
} else {
// update the authorities with the new committee
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
self.net.load().clone_inner_clients(),
self.net.load().metrics.clone(),
self.net.load().safe_client_metrics.clone(),
));
self.net.store(new_net);
}
self.net.load().clone_inner_clients()
};
// Replace the clients in the authority aggregator with new clients.
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
self.state.epoch_store().clone(),
new_clients,
self.net.load().metrics.clone(),
self.net.load().safe_client_metrics.clone(),
));
self.net.store(new_net);

// TODO: Update all committee in all components safely,
// potentially restart narwhal committee/consensus adapter,
Expand Down Expand Up @@ -207,8 +209,7 @@ where
pub fn recreate_network(
&self,
sui_system_state: SuiSystemState,
new_committee: Committee,
) -> SuiResult {
) -> SuiResult<BTreeMap<AuthorityName, A>> {
let mut new_clients = BTreeMap::new();
let next_epoch_validators = sui_system_state.validators.next_epoch_validators;

Expand Down Expand Up @@ -266,16 +267,7 @@ where
);
new_clients.insert(public_key_bytes, client);
}

// Replace the clients in the authority aggregator with new clients.
let new_net = Arc::new(AuthorityAggregator::new(
new_committee,
new_clients,
self.net.load().metrics.clone(),
self.net.load().safe_client_metrics.clone(),
));
self.net.store(new_net);
Ok(())
Ok(new_clients)
}

async fn wait_for_epoch_cert(
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn test_start_epoch_change() {
let state = states[0].clone();

// Check that we initialized the genesis epoch.
let init_epoch = state.epoch_store.get_latest_authenticated_epoch();
let init_epoch = state.epoch_store().get_latest_authenticated_epoch();
assert!(matches!(init_epoch, AuthenticatedEpoch::Genesis(..)));
assert_eq!(init_epoch.epoch(), 0);

Expand Down Expand Up @@ -220,7 +220,7 @@ async fn test_finish_epoch_change() {
for active in actives {
assert_eq!(active.state.epoch(), 1);
assert_eq!(active.net.load().committee.epoch, 1);
let latest_epoch = active.state.epoch_store.get_latest_authenticated_epoch();
let latest_epoch = active.state.epoch_store().get_latest_authenticated_epoch();
assert_eq!(latest_epoch.epoch(), 1);
assert!(matches!(latest_epoch, AuthenticatedEpoch::Certified(..)));
assert_eq!(latest_epoch.epoch_info().epoch(), 1);
Expand Down
Loading

0 comments on commit 95796a5

Please sign in to comment.