Skip to content

Commit

Permalink
Reverted info logs to warns. Synced to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
ade authored and ade committed Dec 30, 2021
2 parents dc416d2 + 1a9bdf4 commit c753062
Show file tree
Hide file tree
Showing 19 changed files with 1,019 additions and 914 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ FastPay allows a set of distributed authorities, some of which are Byzantine, to
cargo build --release
cd target/release
rm -f *.json *.toml
rm -rf db*

# Create configuration files for 4 authorities with 4 shards each.
# Create DB dirs and configuration files for 4 authorities.
# * Private server states are stored in `server*.json`.
# * `committee.json` is the public description of the FastPay committee.
for I in 1 2 3 4
do
./server --server server"$I".json generate --host 127.0.0.1 --port 9"$I"00 --shards 4 >> committee.json
mkdir ./db"$I"
./server --server server"$I".json generate --host 127.0.0.1 --port 9"$I"00 --database-path ./db"$I" >> committee.json
done

# Create configuration files for 100 user accounts, with 4 gas objects per account and 200 value each.
Expand All @@ -34,11 +36,8 @@ done
# Start servers
for I in 1 2 3 4
do
for J in $(seq 0 3)
do
./server --server server"$I".json run --shard "$J" --initial-accounts initial_accounts.toml --committee committee.json &
done
done
./server --server server"$I".json run --initial-accounts initial_accounts.toml --committee committee.json &
done

# Query account addresses
./client --committee committee.json --accounts accounts.json query-accounts-addrs
Expand Down
1 change: 1 addition & 0 deletions fastpay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tempfile = "3.2.0"
tokio = { version = "0.2.22", features = ["full"] }
rand = "0.7.3"
toml = "0.5.8"
num_cpus = "1.13.1"

fastpay_core = { path = "../fastpay_core" }
fastx-types = { path = "../fastx_types" }
Expand Down
141 changes: 63 additions & 78 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use fastpay_core::authority::*;
use fastx_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
use futures::stream::StreamExt;
use log::*;
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use tokio::runtime::Runtime;
use tokio::{runtime::Builder, time};

use std::env;
use std::fs;
use std::thread;

#[derive(Debug, Clone, StructOpt)]
Expand All @@ -25,7 +25,7 @@ use std::thread;
)]
struct ClientServerBenchmark {
/// Choose a network protocol between Udp and Tcp
#[structopt(long, default_value = "udp")]
#[structopt(long, default_value = "tcp")]
protocol: transport::NetworkProtocol,
/// Hostname
#[structopt(long, default_value = "127.0.0.1")]
Expand All @@ -36,9 +36,6 @@ struct ClientServerBenchmark {
/// Size of the FastPay committee
#[structopt(long, default_value = "10")]
committee_size: usize,
/// Number of shards per FastPay authority
#[structopt(long, default_value = "15")]
num_shards: u32,
/// Maximum number of requests in flight (0 for blocking client)
#[structopt(long, default_value = "1000")]
max_in_flight: usize,
Expand All @@ -59,30 +56,27 @@ struct ClientServerBenchmark {
fn main() {
env_logger::from_env(env_logger::Env::default().default_filter_or("info")).init();
let benchmark = ClientServerBenchmark::from_args();

let (states, orders) = benchmark.make_structures();

// Start the servers on the thread pool
for state in states {
// Make special single-core runtime for each server
let b = benchmark.clone();
thread::spawn(move || {
let mut runtime = Builder::new()
.enable_all()
.basic_scheduler()
.thread_stack_size(15 * 1024 * 1024)
.build()
.unwrap();

runtime.block_on(async move {
let server = b.spawn_server(state).await;
if let Err(err) = server.join().await {
error!("Server ended with an error: {}", err);
}
});
let (state, orders) = benchmark.make_structures();

// Make multi-threaded runtime for the authority
let b = benchmark.clone();
thread::spawn(move || {
let mut runtime = Builder::new()
.enable_all()
.threaded_scheduler()
.thread_stack_size(15 * 1024 * 1024)
.build()
.unwrap();

runtime.block_on(async move {
let server = b.spawn_server(state).await;
if let Err(err) = server.join().await {
error!("Server ended with an error: {}", err);
}
});
}
});

// Make a single-core runtime for the client.
let mut runtime = Builder::new()
.enable_all()
.basic_scheduler()
Expand All @@ -93,7 +87,7 @@ fn main() {
}

impl ClientServerBenchmark {
fn make_structures(&self) -> (Vec<AuthorityState>, Vec<(u32, Bytes)>) {
fn make_structures(&self) -> (AuthorityState, Vec<Bytes>) {
info!("Preparing accounts.");
let mut keys = Vec::new();
for _ in 0..self.committee_size {
Expand All @@ -104,37 +98,35 @@ impl ClientServerBenchmark {
total_votes: self.committee_size,
};

// Pick an authority and create one state per shard.
// Pick an authority and create state.
let (public_auth0, secret_auth0) = keys.pop().unwrap();
let mut states = Vec::new();
for i in 0..self.num_shards {
let state = AuthorityState::new_shard(
committee.clone(),
public_auth0,
secret_auth0.copy(),
i as u32,
self.num_shards,
);
states.push(state);
}

// Create a random directory to store the DB
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

let state = AuthorityState::new(committee.clone(), public_auth0, secret_auth0.copy(), path);

// Seed user accounts.
let mut rt = Runtime::new().unwrap();
let mut account_objects = Vec::new();
for _ in 0..self.num_accounts {
let keypair = get_key_pair();
let object_id: ObjectID = ObjectID::random();
let i = AuthorityState::get_shard(self.num_shards, &object_id) as usize;
assert!(states[i].in_shard(&object_id));
let mut client = Object::with_id_for_testing(object_id);
client.transfer(keypair.0);
states[i].init_order_lock(client.to_object_reference());
states[i].insert_object(client);
account_objects.push((keypair.0, object_id, keypair.1));
}
rt.block_on(async {
for _ in 0..self.num_accounts {
let keypair = get_key_pair();
let object_id: ObjectID = ObjectID::random();

let client = Object::with_id_owner_for_testing(object_id, keypair.0);
assert!(client.next_sequence_number == SequenceNumber::from(0));
state.init_order_lock(client.to_object_reference()).await;
state.insert_object(client).await;
account_objects.push((keypair.0, object_id, keypair.1));
}
});

info!("Preparing transactions.");
// Make one transaction per account (transfer order + confirmation).
let mut orders: Vec<(u32, Bytes)> = Vec::new();
let mut orders: Vec<Bytes> = Vec::new();
let mut next_recipient = get_key_pair().0;
for (pubx, object_id, secx) in account_objects.iter() {
let transfer = Transfer {
Expand All @@ -145,12 +137,11 @@ impl ClientServerBenchmark {
user_data: UserData::default(),
};
next_recipient = *pubx;
let order = Order::new_transfer(transfer.clone(), secx);
let shard = AuthorityState::get_shard(self.num_shards, order.object_id());
let order = Order::new_transfer(transfer, secx);

// Serialize order
let bufx = serialize_order(&order);
assert!(!bufx.is_empty());
let serialized_order = serialize_order(&order);
assert!(!serialized_order.is_empty());

// Make certificate
let mut certificate = CertifiedOrder {
Expand All @@ -163,14 +154,14 @@ impl ClientServerBenchmark {
certificate.signatures.push((*pubx, sig));
}

let bufx2 = serialize_cert(&certificate);
assert!(!bufx2.is_empty());
let serialized_certificate = serialize_cert(&certificate);
assert!(!serialized_certificate.is_empty());

orders.push((shard, bufx2.into()));
orders.push((shard, bufx.into()));
orders.push(serialized_order.into());
orders.push(serialized_certificate.into());
}

(states, orders)
(state, orders)
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
Expand All @@ -184,14 +175,15 @@ impl ClientServerBenchmark {
server.spawn().await.unwrap()
}

async fn launch_client(&self, mut orders: Vec<(u32, Bytes)>) {
async fn launch_client(&self, mut orders: Vec<Bytes>) {
time::delay_for(Duration::from_millis(1000)).await;

let items_number = orders.len() / 2;
let time_start = Instant::now();

let max_in_flight = (self.max_in_flight / self.num_shards as usize) as usize;
info!("Set max_in_flight per shard to {}", max_in_flight);
let connections: usize = num_cpus::get();
let max_in_flight = self.max_in_flight / connections as usize;
info!("Set max_in_flight to {}", max_in_flight);

info!("Sending requests.");
if self.max_in_flight > 0 {
Expand All @@ -204,22 +196,15 @@ impl ClientServerBenchmark {
Duration::from_micros(self.recv_timeout_us),
max_in_flight as u64,
);
let mut sharded_requests = HashMap::new();
for (shard, buf) in orders.iter().rev() {
sharded_requests
.entry(*shard)
.or_insert_with(Vec::new)
.push(buf.clone());
}
let responses = mass_client.run(sharded_requests).concat().await;

let responses = mass_client.run(orders, connections).concat().await;
info!("Received {} responses.", responses.len(),);
} else {
// Use actual client core
let client = network::Client::new(
self.protocol,
self.host.clone(),
self.port,
self.num_shards,
self.buffer_size,
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
Expand All @@ -229,8 +214,8 @@ impl ClientServerBenchmark {
if orders.len() % 1000 == 0 {
info!("Process message {}...", orders.len());
}
let (shard, order) = orders.pop().unwrap();
let status = client.send_recv_bytes(shard, order.to_vec()).await;
let order = orders.pop().unwrap();
let status = client.send_recv_bytes(order.to_vec()).await;
match status {
Ok(info) => {
debug!("Query response: {:?}", info);
Expand Down
34 changes: 14 additions & 20 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![deny(warnings)]

use fastpay::{config::*, network, transport};
use fastpay_core::{authority::*, client::*};
use fastpay_core::client::*;
use fastx_types::{base_types::*, committee::Committee, messages::*, serialize::*};

use bytes::Bytes;
Expand All @@ -30,7 +30,6 @@ fn make_authority_clients(
config.network_protocol,
config.host,
config.base_port,
config.num_shards,
buffer_size,
send_timeout,
recv_timeout,
Expand All @@ -46,7 +45,7 @@ fn make_authority_mass_clients(
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
max_in_flight: u64,
) -> Vec<(u32, network::MassClient)> {
) -> Vec<network::MassClient> {
let mut authority_clients = Vec::new();
for config in &committee_config.authorities {
let client = network::MassClient::new(
Expand All @@ -56,9 +55,9 @@ fn make_authority_mass_clients(
buffer_size,
send_timeout,
recv_timeout,
max_in_flight / config.num_shards as u64, // Distribute window to diff shards
max_in_flight,
);
authority_clients.push((config.num_shards, client));
authority_clients.push(client);
}
authority_clients
}
Expand Down Expand Up @@ -219,17 +218,12 @@ async fn mass_broadcast_orders(
max_in_flight,
);
let mut streams = Vec::new();
for (num_shards, client) in authority_clients {
// Re-index orders by shard for this particular authority client.
let mut sharded_requests = HashMap::new();
for (object_id, buf) in &orders {
let shard = AuthorityState::get_shard(num_shards, object_id);
sharded_requests
.entry(shard)
.or_insert_with(Vec::new)
.push(buf.clone());
for client in authority_clients {
let mut requests = Vec::new();
for (_object_id, buf) in &orders {
requests.push(buf.clone());
}
streams.push(client.run(sharded_requests));
streams.push(client.run(requests, 1));
}
let responses = futures::stream::select_all(streams).concat().await;
let time_elapsed = time_start.elapsed();
Expand Down Expand Up @@ -489,7 +483,7 @@ fn main() {

let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
info!("Starting benchmark phase 1 (transfer orders)");
warn!("Starting benchmark phase 1 (transfer orders)");
let (orders, serialize_orders) =
make_benchmark_transfer_orders(&mut accounts_config, max_orders);
let responses = mass_broadcast_orders(
Expand All @@ -510,13 +504,13 @@ fn main() {
.collect();
info!("Received {} valid votes.", votes.len());

info!("Starting benchmark phase 2 (confirmation orders)");
warn!("Starting benchmark phase 2 (confirmation orders)");
let certificates = if let Some(files) = server_configs {
warn!("Using server configs provided by --server-configs");
let files = files.iter().map(AsRef::as_ref).collect();
make_benchmark_certificates_from_orders_and_server_configs(orders, files)
} else {
info!("Using committee config");
warn!("Using committee config");
make_benchmark_certificates_from_votes(&committee_config, votes)
};
let responses = mass_broadcast_orders(
Expand All @@ -540,13 +534,13 @@ fn main() {
}
None => acc,
});
info!(
warn!(
"Received {} valid confirmations for {} transfers.",
num_valid,
confirmed.len()
);

info!("Updating local state of user accounts");
warn!("Updating local state of user accounts");
// Make sure that the local balances are accurate so that future
// balance checks of the non-mass client pass.
mass_update_recipients(&mut accounts_config, certificates);
Expand Down
2 changes: 1 addition & 1 deletion fastpay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct AuthorityConfig {
pub address: FastPayAddress,
pub host: String,
pub base_port: u32,
pub num_shards: u32,
pub database_path: String,
}

impl AuthorityConfig {
Expand Down
Loading

0 comments on commit c753062

Please sign in to comment.