Skip to content

Commit

Permalink
add AuthorityAggregatorBuilder (MystenLabs#5233)
Browse files Browse the repository at this point in the history
* use genesis.blob to set up remote bench

* add AuthorityAggregatorBuilder
  • Loading branch information
longbowlu authored Oct 17, 2022
1 parent 38658d0 commit d863344
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 207 deletions.
104 changes: 30 additions & 74 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use prometheus::Registry;
use rand::seq::SliceRandom;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use strum_macros::EnumString;
use sui_benchmark::drivers::bench_driver::BenchDriver;
use sui_benchmark::drivers::driver::Driver;
Expand All @@ -21,17 +20,9 @@ use sui_benchmark::workloads::workload::get_latest;
use sui_benchmark::workloads::{
make_combination_workload, make_shared_counter_workload, make_transfer_object_workload,
};
use sui_config::gateway::GatewayConfig;
use sui_config::Config;
use sui_config::PersistedConfig;
use sui_core::authority_aggregator::AuthAggMetrics;
use sui_core::authority_aggregator::{reconfig_from_genesis, AuthorityAggregator};
use sui_core::authority_client::make_authority_clients;
use sui_core::authority_aggregator::{reconfig_from_genesis, AuthorityAggregatorBuilder};
use sui_core::authority_client::AuthorityAPI;
use sui_core::authority_client::NetworkAuthorityClient;
use sui_core::epoch::committee_store::CommitteeStore;
use sui_core::safe_client::SafeClientMetrics;
use sui_core::validator_info::make_committee;
use sui_node::metrics;
use sui_types::base_types::ObjectID;
use sui_types::base_types::SuiAddress;
Expand All @@ -42,7 +33,6 @@ use sui_types::messages::BatchInfoResponseItem;
use sui_types::messages::TransactionInfoRequest;
use tracing::log::info;

use sui_core::authority_client::NetworkAuthorityClientMetrics;
use test_utils::authority::spawn_test_authorities;
use test_utils::authority::test_and_configure_authority_configs;
use test_utils::objects::generate_gas_objects_with_owner;
Expand All @@ -69,15 +59,16 @@ struct Opts {
pub num_client_threads: u64,
#[clap(long, default_value = "", global = true)]
pub log_path: String,
/// Path where gateway config is stored when running remote benchmark
/// This is also the path where gateway config is stored during local
/// benchmark
#[clap(long, default_value = "/tmp/gateway.yaml", global = true)]
pub gateway_config_path: String,
/// [Required for remote benchmark]
/// Path where genesis.blob is stored when running remote benchmark
#[clap(long, default_value = "/tmp/genesis.blob", global = true)]
pub genesis_blob_path: String,
/// [Required for remote benchmark]
/// Path where keypair for primary gas account is stored. The format of
/// this file is same as what `sui keytool generate` outputs
#[clap(long, default_value = "", global = true)]
pub keystore_path: String,
/// [Required for remote benchmark]
/// Object id of the primary gas coin used for benchmark
/// NOTE: THe remote network should have this coin in its genesis config
/// with large enough gas i.e. u64::MAX
Expand All @@ -87,7 +78,7 @@ struct Opts {
pub primary_gas_objects: u64,
/// Whether to run local or remote benchmark
/// NOTE: For running remote benchmark we must have the following
/// gateway_config_path, keypair_path and primary_gas_id
/// genesis_blob_path, keypair_path and primary_gas_id
#[clap(long, parse(try_from_str), default_value = "true", global = true)]
pub local: bool,
/// Default workload is 100% transfer object
Expand Down Expand Up @@ -253,12 +244,11 @@ async fn main() -> Result<()> {
config.log_file = Some(opts.log_path);
}
let _guard = config.with_env().init();
let registry: Registry = metrics::start_prometheus_server(
let registry: Arc<Registry> = Arc::new(metrics::start_prometheus_server(
format!("{}:{}", opts.client_metric_host, opts.client_metric_port)
.parse()
.unwrap(),
);
let network_authority_client_metrics = Arc::new(NetworkAuthorityClientMetrics::new(&registry));
));
let barrier = Arc::new(Barrier::new(2));
let cloned_barrier = barrier.clone();
let (primary_gas_id, owner, keypair, aggregator) = if opts.local {
Expand All @@ -274,39 +264,19 @@ async fn main() -> Result<()> {
});
Arc::new(configs)
};
let gateway_config = GatewayConfig {
epoch: 0,
validator_set: configs.validator_set().to_vec(),
send_timeout: Duration::from_secs(4),
recv_timeout: Duration::from_secs(4),
buffer_size: 650000,
db_folder_path: PathBuf::from("/tmp/client_db"),
};
gateway_config.save(&opts.gateway_config_path)?;

// bring up servers ..
let (owner, keypair): (SuiAddress, AccountKeyPair) = test_account_keys().pop().unwrap();
let primary_gas = generate_gas_objects_with_owner(1, owner);
let primary_gas_id = primary_gas.get(0).unwrap().id();
// Make the client runtime wait until we are done creating genesis objects
let cloned_config = configs;
let cloned_config = configs.clone();
let cloned_gas = primary_gas;
let auth_clients = make_authority_clients(
&gateway_config.validator_set,
gateway_config.send_timeout,
gateway_config.recv_timeout,
network_authority_client_metrics.clone(),
);

let committee = make_committee(gateway_config.epoch, &gateway_config.validator_set)?;
let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee));
let aggregator = Arc::new(AuthorityAggregator::new(
committee,
committee_store,
auth_clients.clone(),
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_authority_client_metrics.clone(),
));
let (aggregator, auth_clients) = AuthorityAggregatorBuilder::from_network_config(&configs)
.with_registry(registry.clone())
.build()
.unwrap();

// spawn a thread to spin up sui nodes on the multi-threaded server runtime
let _ = std::thread::spawn(move || {
Expand Down Expand Up @@ -341,7 +311,12 @@ async fn main() -> Result<()> {
join_all(follower_handles).await;
});
});
(primary_gas_id, owner, Arc::new(keypair), aggregator)
(
primary_gas_id,
owner,
Arc::new(keypair),
Arc::new(aggregator),
)
} else {
eprintln!("Configuring remote benchmark..");
std::thread::spawn(move || {
Expand All @@ -352,33 +327,13 @@ async fn main() -> Result<()> {
cloned_barrier.wait().await;
});
});
let config_path = Some(&opts.gateway_config_path)
.filter(|s| !s.is_empty())
.map(PathBuf::from)
.ok_or_else(|| {
anyhow!(format!(
"Failed to find gateway config at path: {}",
opts.gateway_config_path
))
})?;
let config: GatewayConfig = PersistedConfig::read(&config_path)?;
let committee = make_committee(config.epoch, &config.validator_set)?;
let genesis = sui_config::node::Genesis::new_from_file(&opts.genesis_blob_path);
let genesis = genesis.genesis()?;
let (aggregator, _) = AuthorityAggregatorBuilder::from_genesis(genesis)
.with_registry(registry.clone())
.build()
.unwrap();

let authority_clients = make_authority_clients(
&config.validator_set,
config.send_timeout,
config.recv_timeout,
network_authority_client_metrics.clone(),
);
let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee));
let aggregator = AuthorityAggregator::new(
committee,
committee_store,
authority_clients,
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_authority_client_metrics.clone(),
);
let aggregator = Arc::new(reconfig_from_genesis(aggregator).await?);
eprintln!(
"Reconfiguration - Reconfiguration to epoch {} is done",
Expand Down Expand Up @@ -426,6 +381,7 @@ async fn main() -> Result<()> {
let prev_benchmark_stats_path = opts.compare_with.clone();
let curr_benchmark_stats_path = opts.benchmark_stats_path.clone();
let arc_agg = aggregator.clone();
let registry_clone = registry.clone();
let handle = std::thread::spawn(move || {
client_runtime.block_on(async move {
match opts.run_spec {
Expand Down Expand Up @@ -502,7 +458,7 @@ async fn main() -> Result<()> {
let show_progress = interval.is_unbounded();
let driver = BenchDriver::new(stat_collection_interval);
driver
.run(workloads, arc_agg, &registry, show_progress, interval)
.run(workloads, arc_agg, &registry_clone, show_progress, interval)
.await
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-benchmark/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub fn get_ed25519_keypair_from_keystore(
let keystore = FileBasedKeystore::new(&keystore_path)?;
match keystore.get_key(requested_address) {
Ok(SuiKeyPair::Ed25519SuiKeyPair(kp)) => Ok(kp.copy()),
_ => Err(anyhow::anyhow!("Unsupported key type")),
other => Err(anyhow::anyhow!("Invalid key type: {:?}", other)),
}
}
7 changes: 5 additions & 2 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use sui_config::SUI_KEYSTORE_FILENAME;
use sui_core::test_utils::test_authority_aggregator;
use sui_core::authority_aggregator::AuthorityAggregatorBuilder;
use test_utils::{
messages::get_gas_object_with_wallet_context, network::init_cluster_builder_env_aware,
};
Expand Down Expand Up @@ -76,7 +76,10 @@ mod test {
1, // transfer_object_weight
)];

let aggregator = Arc::new(test_authority_aggregator(swarm.config()));
let (aggregator, _) = AuthorityAggregatorBuilder::from_network_config(swarm.config())
.build()
.unwrap();
let aggregator = Arc::new(aggregator);

for w in workloads.iter_mut() {
w.workload.init(aggregator.clone()).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl Genesis {
}
}

fn genesis(&self) -> Result<&genesis::Genesis> {
pub fn genesis(&self) -> Result<&genesis::Genesis> {
match &self.location {
GenesisLocation::InPlace { genesis } => Ok(genesis),
GenesisLocation::File {
Expand Down
94 changes: 90 additions & 4 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
// SPDX-License-Identifier: Apache-2.0

use crate::authority_client::{
make_network_authority_client_sets_from_committee, AuthorityAPI, NetworkAuthorityClient,
NetworkAuthorityClientMetrics,
make_authority_clients, make_network_authority_client_sets_from_committee, AuthorityAPI,
NetworkAuthorityClient, NetworkAuthorityClientMetrics,
};
use crate::safe_client::{SafeClient, SafeClientMetrics};
use crate::validator_info::make_committee;
use async_trait::async_trait;

use futures::{future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use move_core_types::value::MoveStructLayout;
use mysten_network::config::Config;
use sui_network::default_mysten_network_config;
use sui_types::crypto::AuthoritySignature;
use sui_config::genesis::Genesis;
use sui_config::NetworkConfig;
use sui_network::{
default_mysten_network_config, DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC,
};
use sui_types::crypto::{AuthorityPublicKeyBytes, AuthoritySignature};
use sui_types::object::{Object, ObjectFormatOptions, ObjectRead};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::{
Expand All @@ -32,6 +37,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Instrument};

use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry, Histogram, IntCounter,
Registry,
};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::string::ToString;
Expand Down Expand Up @@ -2282,3 +2288,83 @@ pub async fn reconfig_from_genesis(
// Now transit from latest_epoch - 1 to latest_epoch
aggregator.recreate_with_net_addresses(latest_committee, &network_config)
}

pub struct AuthorityAggregatorBuilder<'a> {
network_config: Option<&'a NetworkConfig>,
genesis: Option<&'a Genesis>,
committee_store: Option<Arc<CommitteeStore>>,
registry: Option<Arc<Registry>>,
}

impl<'a> AuthorityAggregatorBuilder<'a> {
pub fn from_network_config(config: &'a NetworkConfig) -> Self {
Self {
network_config: Some(config),
genesis: None,
committee_store: None,
registry: None,
}
}

pub fn from_genesis(genesis: &'a Genesis) -> Self {
Self {
network_config: None,
genesis: Some(genesis),
committee_store: None,
registry: None,
}
}

pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
self.committee_store = Some(committee_store);
self
}

pub fn with_registry(mut self, registry: Arc<Registry>) -> Self {
self.registry = Some(registry);
self
}

pub fn build(
self,
) -> anyhow::Result<(
AuthorityAggregator<NetworkAuthorityClient>,
BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
)> {
let validator_info = if let Some(network_config) = self.network_config {
network_config.validator_set()
} else if let Some(genesis) = self.genesis {
genesis.validator_set()
} else {
anyhow::bail!("need either NetworkConfig or Genesis.");
};
let committee = make_committee(0, validator_info)?;
let registry = self
.registry
.unwrap_or_else(|| Arc::new(prometheus::Registry::new()));
let network_metrics = Arc::new(NetworkAuthorityClientMetrics::new(&registry));

let auth_clients = make_authority_clients(
validator_info,
DEFAULT_CONNECT_TIMEOUT_SEC,
DEFAULT_REQUEST_TIMEOUT_SEC,
network_metrics.clone(),
);
let committee_store = if let Some(committee_store) = self.committee_store {
committee_store
} else {
Arc::new(CommitteeStore::new_for_testing(&committee))
};
Ok((
AuthorityAggregator::new(
committee,
committee_store,
auth_clients.clone(),
AuthAggMetrics::new(&registry),
Arc::new(SafeClientMetrics::new(&registry)),
network_metrics,
),
auth_clients,
))
}
}
8 changes: 4 additions & 4 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ pub fn make_network_authority_client_sets_from_genesis(

pub fn make_authority_clients(
validator_set: &[ValidatorInfo],
send_timeout: Duration,
recv_timeout: Duration,
connect_timeout: Duration,
request_timeout: Duration,
net_metrics: Arc<NetworkAuthorityClientMetrics>,
) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
let mut authority_clients = BTreeMap::new();
let mut network_config = mysten_network::config::Config::new();
network_config.connect_timeout = Some(send_timeout);
network_config.request_timeout = Some(recv_timeout);
network_config.connect_timeout = Some(connect_timeout);
network_config.request_timeout = Some(request_timeout);
for authority in validator_set {
let channel = network_config
.connect_lazy(authority.network_address())
Expand Down
9 changes: 6 additions & 3 deletions crates/sui-core/src/node_sync/node_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ where
mod test {
use super::*;
use crate::{
authority_active::gossip::GossipMetrics, authority_client::NetworkAuthorityClient,
node_sync::SyncStatus, test_utils::test_authority_aggregator,
authority_active::gossip::GossipMetrics, authority_aggregator::AuthorityAggregatorBuilder,
authority_client::NetworkAuthorityClient, node_sync::SyncStatus,
};
use std::sync::{Arc, Mutex};
use sui_macros::sim_test;
Expand Down Expand Up @@ -454,7 +454,10 @@ mod test {
// Set up an authority
let config = test_and_configure_authority_configs(1);
let authorities = spawn_test_authorities(objects, &config).await;
let net = Arc::new(test_authority_aggregator(&config));
let (agg, _) = AuthorityAggregatorBuilder::from_network_config(&config)
.build()
.unwrap();
let net = Arc::new(agg);

execute_transactions(&net, &transactions).await;

Expand Down
Loading

0 comments on commit d863344

Please sign in to comment.