Skip to content

Commit

Permalink
benchmark: add back in parallelization of sending txns
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed May 3, 2022
1 parent e88470d commit 67a3b44
Showing 1 changed file with 82 additions and 38 deletions.
120 changes: 82 additions & 38 deletions sui/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use futures::{
channel::mpsc::{channel as MpscChannel, Receiver, Sender as MpscSender},
future::try_join_all,
stream::StreamExt,
SinkExt,
};
Expand Down Expand Up @@ -42,27 +43,41 @@ pub fn check_transaction_response(reply_message: Result<TransactionInfoResponse,
pub async fn send_tx_chunks(
tx_chunks: Vec<(Transaction, CertifiedTransaction)>,
net_client: NetworkClient,
_conn: usize,
conn: usize,
) -> (u128, Vec<Result<TransactionInfoResponse, io::Error>>) {
let time_start = Instant::now();

// This probably isn't going to be as fast so we probably want to provide away to send a batch
// of txns to the authority at a time
let client = NetworkAuthorityClient::new(net_client);
let mut tx_resp = Vec::new();
for tx in tx_chunks {
let resp = client
.handle_transaction(tx.0)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
tx_resp.push(resp);
let resp = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate: tx.1 })
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
tx_resp.push(resp);
let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::new(net_client.clone());
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
let mut resps = Vec::new();
for (transaction, certificate) in txns {
let resp1 = client
.handle_transaction(transaction)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let resp2 = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate })
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
resps.push(resp1);
resps.push(resp2);
}
resps
});
tasks.push(task);
}

let tx_resp = try_join_all(tasks)
.await
.unwrap()
.into_iter()
.flatten()
.collect();

let elapsed = time_start.elapsed().as_micros();

(elapsed, tx_resp)
Expand All @@ -71,22 +86,36 @@ pub async fn send_tx_chunks(
pub async fn send_transactions(
tx_chunks: Vec<Transaction>,
net_client: NetworkClient,
_conn: usize,
conn: usize,
) -> (u128, Vec<Result<TransactionInfoResponse, io::Error>>) {
let time_start = Instant::now();

// This probably isn't going to be as fast so we probably want to provide away to send a batch
// of txns to the authority at a time
let client = NetworkAuthorityClient::new(net_client);
let mut tx_resp = Vec::new();
for tx in tx_chunks {
let resp = client
.handle_transaction(tx)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
tx_resp.push(resp);
let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::new(net_client.clone());
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
let mut resps = Vec::new();
for transaction in txns {
let resp = client
.handle_transaction(transaction)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
resps.push(resp);
}
resps
});
tasks.push(task);
}

let tx_resp = try_join_all(tasks)
.await
.unwrap()
.into_iter()
.flatten()
.collect();

let elapsed = time_start.elapsed().as_micros();

(elapsed, tx_resp)
Expand All @@ -95,22 +124,36 @@ pub async fn send_transactions(
pub async fn send_confs(
tx_chunks: Vec<CertifiedTransaction>,
net_client: NetworkClient,
_conn: usize,
conn: usize,
) -> (u128, Vec<Result<TransactionInfoResponse, io::Error>>) {
let time_start = Instant::now();

// This probably isn't going to be as fast so we probably want to provide away to send a batch
// of txns to the authority at a time
let client = NetworkAuthorityClient::new(net_client);
let mut tx_resp = Vec::new();
for tx in tx_chunks {
let resp = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate: tx })
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
tx_resp.push(resp);
let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::new(net_client.clone());
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
let mut resps = Vec::new();
for certificate in txns {
let resp = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate })
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
resps.push(resp);
}
resps
});
tasks.push(task);
}

let tx_resp = try_join_all(tasks)
.await
.unwrap()
.into_iter()
.flatten()
.collect();

let elapsed = time_start.elapsed().as_micros();

(elapsed, tx_resp)
Expand Down Expand Up @@ -167,6 +210,7 @@ impl FixedRateLoadGenerator {
let (result_chann_tx, results_chann_rx) = MpscChannel(transactions.len() * 2);

let conn = connections;
info!("connections: {connections}");
// Spin up a bunch of worker tasks
// Give each task
// Step by 2*conn due to order+confirmation, with `conn` tcp connections
Expand Down

0 comments on commit 67a3b44

Please sign in to comment.