Skip to content

Commit

Permalink
WIP: Modularize benches (MystenLabs#1321)
Browse files Browse the repository at this point in the history
* Modularize benches
  • Loading branch information
oxade authored Apr 14, 2022
1 parent cef1d29 commit 85ab554
Show file tree
Hide file tree
Showing 12 changed files with 971 additions and 1,194 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
- name: run benchmark
run: |
set -o pipefail
cargo run --release --bin microbench 2>&1 | huniq | tee -a artifacts/bench_results.txt
cargo run --release --bin bench microbench throughput 2>&1 | huniq | tee -a artifacts/bench_results.txt
- name: retrieve benchmark results
id: get-comment-body
run: |
Expand Down
2 changes: 1 addition & 1 deletion network_utils/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::StreamExt;
use tokio::task::JoinError;
use tokio::time;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct NetworkClient {
base_address: String,
base_port: u16,
Expand Down
12 changes: 6 additions & 6 deletions scripts/bench_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from string import Template

cmd_template = Template(
"../target/release/microbench_latency --period-us $period_us --chunk-size $chunk_size --num-chunks $num_chunks")
"../target/release/bench microbench latency --period-us $period_us --chunk-size $chunk_size --num-chunks $num_chunks")

def get_avg_latency(period_us, chunk_size, num_chunks):
cmd = cmd_template.substitute(
Expand All @@ -15,11 +15,11 @@ def get_avg_latency(period_us, chunk_size, num_chunks):
output, error = process.communicate()

resp = output.decode("utf-8")
res = ast.literal_eval(resp)

# Example output: `Average Latency 6577.06 us @ 100000 tps`
res = float(resp.split(" ")[2])
print(res)
# Pick upper half at steady state
res = res[len(res)//2:]
return sum(res)/len(res)
return res


def plot(vals):
Expand All @@ -32,7 +32,7 @@ def plot(vals):
lats = []
for i in range(10):
chunk_size = 200 * (i+1)
period_us = 10000
period_us = 1000
num_chunks = 10
thr = chunk_size*1000*1000/period_us
avg_lat_ms = get_avg_latency(period_us, chunk_size, num_chunks)/1000
Expand Down
8 changes: 2 additions & 6 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ reqwest = { version = "0.11.10", features=["json","serde_json", "blocking"]}
tracing-test = "0.2.1"

[[bin]]
name = "microbench"
path = "src/microbench.rs"

[[bin]]
name = "microbench_latency"
path = "src/microbench_latency.rs"
name = "bench"
path = "src/bench.rs"

[[bin]]
name = "wallet"
Expand Down
22 changes: 22 additions & 0 deletions sui/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![deny(warnings)]

use clap::*;

use sui::benchmark::{bench_types, run_benchmark};
use tracing::subscriber::set_global_default;
use tracing_subscriber::EnvFilter;

fn main() {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber_builder =
tracing_subscriber::fmt::Subscriber::builder().with_env_filter(env_filter);
let subscriber = subscriber_builder.with_writer(std::io::stderr).finish();
set_global_default(subscriber).expect("Failed to set subscriber");
let benchmark = bench_types::Benchmark::parse();

let r = run_benchmark(benchmark);
println!("{}", r);
}
267 changes: 267 additions & 0 deletions sui/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![deny(warnings)]

use futures::{join, StreamExt};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::thread;
use std::{thread::sleep, time::Duration};
use sui_core::authority_client::AuthorityClient;
use sui_network::network::{NetworkClient, NetworkServer};
use sui_types::batch::UpdateItem;
use sui_types::messages::{BatchInfoRequest, BatchInfoResponseItem};
use sui_types::serialize::*;
use tokio::runtime::{Builder, Runtime};
use tracing::*;

pub mod bench_types;
pub mod load_generator;
pub mod transaction_creator;
use crate::benchmark::bench_types::{Benchmark, BenchmarkType};
use crate::benchmark::load_generator::{
calculate_throughput, check_transaction_response, send_tx_chunks, spawn_authority_server,
FixedRateLoadGenerator,
};
use crate::benchmark::transaction_creator::TransactionCreator;

use self::bench_types::{BenchmarkResult, MicroBenchmarkResult, MicroBenchmarkType};

const FOLLOWER_BATCH_SIZE: u64 = 10_000;

pub fn run_benchmark(benchmark: Benchmark) -> BenchmarkResult {
// Only microbenchmark support is supported
BenchmarkResult::MicroBenchmark(run_microbenchmark(benchmark))
}

fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
let (host, port, type_) = match benchmark.bench_type {
BenchmarkType::MicroBenchmark { host, port, type_ } => (host, port, type_),
};

let network_client = NetworkClient::new(
host.clone(),
port,
benchmark.buffer_size,
Duration::from_micros(benchmark.send_timeout_us),
Duration::from_micros(benchmark.recv_timeout_us),
);
let network_server = NetworkServer::new(host, port, benchmark.buffer_size);
let connections = if benchmark.tcp_connections > 0 {
benchmark.tcp_connections
} else {
num_cpus::get()
};

match type_ {
MicroBenchmarkType::Throughput { num_transactions } => run_throughout_microbench(
network_client,
network_server,
connections,
benchmark.batch_size,
benchmark.use_move,
num_transactions,
benchmark.committee_size,
benchmark.db_cpus,
),
MicroBenchmarkType::Latency {
num_chunks,
chunk_size,
period_us,
} => run_latency_microbench(
network_client,
network_server,
connections,
benchmark.use_move,
benchmark.committee_size,
benchmark.db_cpus,
num_chunks,
chunk_size,
period_us,
),
}
}

fn run_throughout_microbench(
network_client: NetworkClient,
network_server: NetworkServer,
connections: usize,
batch_size: usize,
use_move: bool,
num_transactions: usize,
committee_size: usize,
db_cpus: usize,
) -> MicroBenchmarkResult {
assert_eq!(
num_transactions % batch_size,
0,
"num_transactions must integer divide batch_size",
);
// In order to simplify things, we send chunks on each connection and try to ensure all connections have equal load
assert!(
(num_transactions % connections) == 0,
"num_transactions must {} be multiple of number of TCP connections {}",
num_transactions,
connections
);
let mut tx_cr = TransactionCreator::new(committee_size, db_cpus);

let chunk_size = batch_size * connections;
let txes = tx_cr.generate_transactions(
connections,
use_move,
batch_size * connections,
num_transactions / chunk_size,
);

// Make multi-threaded runtime for the authority
thread::spawn(move || {
get_multithread_runtime().block_on(async move {
let server = spawn_authority_server(network_server, tx_cr.authority_state).await;
if let Err(e) = server.join().await {
error!("Server ended with an error: {e}");
}
});
});

// Wait for server start
sleep(Duration::from_secs(3));

// Follower to observe batches
let follower_network_client = network_client.clone();
thread::spawn(move || {
get_multithread_runtime()
.block_on(async move { run_follower(follower_network_client).await });
});

sleep(Duration::from_secs(3));

// Run load
let (elapsed, resp) = get_multithread_runtime()
.block_on(async move { send_tx_chunks(txes, network_client, connections).await });

let _: Vec<_> = resp
.par_iter()
.map(|q| check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..])))
.collect();
MicroBenchmarkResult::Throughput {
chunk_throughput: calculate_throughput(num_transactions, elapsed),
}
}

fn run_latency_microbench(
network_client: NetworkClient,
network_server: NetworkServer,
connections: usize,
use_move: bool,
committee_size: usize,
db_cpus: usize,

num_chunks: usize,
chunk_size: usize,
period_us: u64,
) -> MicroBenchmarkResult {
// In order to simplify things, we send chunks on each connection and try to ensure all connections have equal load
assert!(
(num_chunks * chunk_size % connections) == 0,
"num_transactions must {} be multiple of number of TCP connections {}",
num_chunks * chunk_size,
connections
);
let mut tx_cr = TransactionCreator::new(committee_size, db_cpus);

// These TXes are to load the network
let load_gen_txes = tx_cr.generate_transactions(connections, use_move, chunk_size, num_chunks);

// These are tracer TXes used for measuring latency
let tracer_txes = tx_cr.generate_transactions(1, use_move, 1, num_chunks);

// Make multi-threaded runtime for the authority
thread::spawn(move || {
get_multithread_runtime().block_on(async move {
let server = spawn_authority_server(network_server, tx_cr.authority_state).await;
if let Err(e) = server.join().await {
error!("Server ended with an error: {e}");
}
});
});

// Wait for server start
sleep(Duration::from_secs(3));
let runtime = get_multithread_runtime();
// Prep the generators
let (mut load_gen, mut tracer_gen) = runtime.block_on(async move {
join!(
FixedRateLoadGenerator::new(
load_gen_txes,
period_us,
network_client.clone(),
connections,
),
FixedRateLoadGenerator::new(tracer_txes, period_us, network_client, 1),
)
});

// Run the load gen and tracers
let (load_latencies, tracer_latencies) =
runtime.block_on(async move { join!(load_gen.start(), tracer_gen.start()) });

MicroBenchmarkResult::Latency {
load_chunk_size: chunk_size,
load_latencies,
tick_period_us: period_us as usize,
chunk_latencies: tracer_latencies,
}
}

async fn run_follower(network_client: NetworkClient) {
// We spawn a second client that listens to the batch interface
let _batch_client_handle = tokio::task::spawn(async move {
let authority_client = AuthorityClient::new(network_client);

let mut start = 0;

loop {
let receiver = authority_client
.handle_batch_streaming_as_stream(BatchInfoRequest {
start,
end: start + FOLLOWER_BATCH_SIZE,
})
.await;

if let Err(e) = &receiver {
error!("Listener error: {:?}", e);
break;
}
let mut receiver = receiver.unwrap();

info!("Start batch listener at sequence: {}.", start);
while let Some(item) = receiver.next().await {
match item {
Ok(BatchInfoResponseItem(UpdateItem::Transaction((_tx_seq, _tx_digest)))) => {
start = _tx_seq + 1;
}
Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch))) => {
info!(
"Client received batch up to sequence {}",
_signed_batch.batch.next_sequence_number
);
}
Err(err) => {
error!("{:?}", err);
break;
}
}
}
}
});
}

fn get_multithread_runtime() -> Runtime {
Builder::new_multi_thread()
.enable_all()
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(usize::min(num_cpus::get(), 24))
.build()
.unwrap()
}
Loading

0 comments on commit 85ab554

Please sign in to comment.