Skip to content

Commit

Permalink
network: use 'multiaddr' for validator interface
Browse files Browse the repository at this point in the history
This patch converts the validator network interface to use and
understand multiaddr in place of using a (host, port) tuple.
  • Loading branch information
bmwill committed May 10, 2022
1 parent db3ea8f commit e403158
Show file tree
Hide file tree
Showing 24 changed files with 461 additions and 388 deletions.
269 changes: 268 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion crates/sui-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@

pub mod api;
pub mod codec;
pub mod network;

pub use tonic;
85 changes: 0 additions & 85 deletions crates/sui-network/src/network.rs

This file was deleted.

12 changes: 4 additions & 8 deletions faucet/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use anyhow::{anyhow, bail};
use std::path::Path;
use sui::{
config::{
AuthorityInfo, Config, GatewayConfig, GatewayType, GenesisConfig, WalletConfig,
SUI_GATEWAY_CONFIG, SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG,
Config, GatewayConfig, GatewayType, GenesisConfig, WalletConfig, SUI_GATEWAY_CONFIG,
SUI_NETWORK_CONFIG, SUI_WALLET_CONFIG,
},
keystore::KeystoreType,
sui_commands::{genesis, SuiNetwork},
Expand Down Expand Up @@ -78,11 +78,7 @@ pub async fn start_test_network(
bail!("genesis_config's authority num should match key_pairs's length.");
}

let authorities = genesis_config
.authorities
.into_iter()
.map(|info| AuthorityInfo { port: 0, ..info })
.collect();
let authorities = genesis_config.authorities.into_iter().collect();
genesis_config.authorities = authorities;

let (network_config, accounts, mut keystore) = genesis(genesis_config, None).await?;
Expand All @@ -99,7 +95,7 @@ pub async fn start_test_network(
.into_iter()
.zip(&network.spawned_authorities)
.map(|(mut info, server)| {
info.port = server.get_port();
info.network_address = server.address().to_owned();
info
})
.collect::<Vec<_>>();
Expand Down
2 changes: 2 additions & 0 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ serde-name = "0.2.0"
dirs = "4.0.0"
clap = { version = "3.1.17", features = ["derive"] }
telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "3b7daedf91fd8937dde26e905b8114cac459b866" }
multiaddr = "0.14.0"
mysten-network = { git = "https://github.com/MystenLabs/mysten-infra", rev = "3b7daedf91fd8937dde26e905b8114cac459b866" }

bcs = "0.1.3"
sui_core = { path = "../sui_core" }
Expand Down
80 changes: 26 additions & 54 deletions sui/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ use crate::benchmark::{
validator_preparer::ValidatorPreparer,
};
use futures::{join, StreamExt};
use multiaddr::Multiaddr;
use rayon::{iter::ParallelIterator, prelude::*};
use std::{panic, thread, thread::sleep, time::Duration};
use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
use sui_network::{
network::{NetworkClient, NetworkServer},
tonic,
};
use sui_types::{
batch::UpdateItem,
messages::{BatchInfoRequest, BatchInfoResponseItem},
Expand All @@ -43,14 +40,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
BenchmarkType::MicroBenchmark { host, port, type_ } => (host, port, type_),
};

let network_client = NetworkClient::new(
host.clone(),
port,
benchmark.buffer_size,
Duration::from_micros(benchmark.send_timeout_us),
Duration::from_micros(benchmark.recv_timeout_us),
);
let network_server = NetworkServer::new(host, port, benchmark.buffer_size);
let address: Multiaddr = format!("/dns/{host}/tcp/{port}/http").parse().unwrap();
let connections = if benchmark.tcp_connections > 0 {
benchmark.tcp_connections
} else {
Expand All @@ -60,14 +50,12 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
benchmark.running_mode,
benchmark.working_dir,
benchmark.committee_size,
network_client.base_address(),
network_client.base_port(),
address.clone(),
benchmark.db_cpus,
);
match type_ {
MicroBenchmarkType::Throughput { num_transactions } => run_throughout_microbench(
network_client,
network_server,
address,
connections,
benchmark.batch_size,
!benchmark.use_native,
Expand All @@ -79,8 +67,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
chunk_size,
period_us,
} => run_latency_microbench(
network_client,
network_server,
address,
connections,
!benchmark.use_native,
num_chunks,
Expand All @@ -92,8 +79,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
}

fn run_throughout_microbench(
network_client: NetworkClient,
network_server: NetworkServer,
address: Multiaddr,
connections: usize,
batch_size: usize,
use_move: bool,
Expand Down Expand Up @@ -124,21 +110,20 @@ fn run_throughout_microbench(
&mut validator_preparer,
);

validator_preparer.deploy_validator(network_server);
validator_preparer.deploy_validator(address.clone());

let result = panic::catch_unwind(|| {
// Follower to observe batches
let follower_network_client = network_client.clone();
let addr = address.clone();
thread::spawn(move || {
get_multithread_runtime()
.block_on(async move { run_follower(follower_network_client).await });
get_multithread_runtime().block_on(async move { run_follower(addr).await });
});

sleep(Duration::from_secs(3));

// Run load
let (elapsed, resp) = get_multithread_runtime()
.block_on(async move { send_tx_chunks(txes, network_client, connections).await });
.block_on(async move { send_tx_chunks(txes, address, connections).await });

let _: Vec<_> = resp
.into_par_iter()
Expand All @@ -160,8 +145,7 @@ fn run_throughout_microbench(
}

fn run_latency_microbench(
network_client: NetworkClient,
_network_server: NetworkServer,
address: Multiaddr,
connections: usize,
use_move: bool,
num_chunks: usize,
Expand Down Expand Up @@ -193,22 +177,23 @@ fn run_latency_microbench(
let tracer_txes =
tx_cr.generate_transactions(1, use_move, 1, num_chunks, None, &mut validator_preparer);

validator_preparer.deploy_validator(_network_server);
validator_preparer.deploy_validator(address.clone());

let result = panic::catch_unwind(|| {
let runtime = get_multithread_runtime();
// Prep the generators
let (mut load_gen, mut tracer_gen) = runtime.block_on(async move {
join!(
FixedRateLoadGenerator::new(
load_gen_txes,
period_us,
network_client.clone(),
connections,
),
FixedRateLoadGenerator::new(tracer_txes, period_us, network_client, 1),
)
});
let (mut load_gen, mut tracer_gen) =
runtime.block_on(async move {
join!(
FixedRateLoadGenerator::new(
load_gen_txes,
period_us,
address.clone(),
connections,
),
FixedRateLoadGenerator::new(tracer_txes, period_us, address.clone(), 1),
)
});

// Run the load gen and tracers
let (load_latencies, tracer_latencies) =
Expand All @@ -231,23 +216,10 @@ fn run_latency_microbench(
}
}

async fn run_follower(network_client: NetworkClient) {
async fn run_follower(address: Multiaddr) {
// We spawn a second client that listens to the batch interface
let _batch_client_handle = tokio::task::spawn(async move {
let uri = format!(
"http://{}:{}",
network_client.base_address(),
network_client.base_port()
)
.parse()
.unwrap();
let channel = tonic::transport::Channel::builder(uri)
.connect_timeout(network_client.send_timeout())
.timeout(network_client.recv_timeout())
.connect()
.await
.unwrap();
let authority_client = NetworkAuthorityClient::with_channel(channel, network_client);
let authority_client = NetworkAuthorityClient::connect(&address).await.unwrap();

let mut start = 0;

Expand Down
Loading

0 comments on commit e403158

Please sign in to comment.