Skip to content

Commit

Permalink
Cleanup banking bench (#24851)
Browse files Browse the repository at this point in the history
* Cleanup banking bench

* Fully remove assert
  • Loading branch information
carllin authored May 2, 2022
1 parent cb96edc commit e83efe6
Showing 1 changed file with 108 additions and 86 deletions.
194 changes: 108 additions & 86 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use {
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
solana_perf::packet::to_packet_batches,
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
Expand Down Expand Up @@ -55,7 +55,6 @@ fn check_txs(
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
trace!("no bank");
no_bank = true;
break;
}
Expand Down Expand Up @@ -121,18 +120,44 @@ fn make_accounts_txs(
.collect()
}

struct Config {
struct PacketsPerIteration {
packet_batches: Vec<PacketBatch>,
transactions: Vec<Transaction>,
packets_per_batch: usize,
}

impl Config {
fn get_transactions_index(&self, chunk_index: usize) -> usize {
chunk_index * self.packets_per_batch
impl PacketsPerIteration {
fn new(
packets_per_batch: usize,
batches_per_iteration: usize,
genesis_hash: Hash,
write_lock_contention: WriteLockContention,
) -> Self {
let total_num_transactions = packets_per_batch * batches_per_iteration;
let transactions = make_accounts_txs(
total_num_transactions,
packets_per_batch,
genesis_hash,
write_lock_contention,
);

let packet_batches: Vec<PacketBatch> = to_packet_batches(&transactions, packets_per_batch);
assert_eq!(packet_batches.len(), batches_per_iteration);
Self {
packet_batches,
transactions,
packets_per_batch,
}
}
}

fn bytes_as_usize(bytes: &[u8]) -> usize {
bytes[0] as usize | (bytes[1] as usize) << 8
fn refresh_blockhash(&mut self, new_blockhash: Hash) {
for tx in self.transactions.iter_mut() {
tx.message.recent_blockhash = new_blockhash;
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
}
self.packet_batches = to_packet_batches(&self.transactions, self.packets_per_batch);
}
}

#[allow(clippy::cognitive_complexity)]
Expand All @@ -142,6 +167,12 @@ fn main() {
let matches = Command::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.arg(
Arg::new("iterations")
.long("iterations")
.takes_value(true)
.help("Number of test iterations"),
)
.arg(
Arg::new("num_chunks")
.long("num-chunks")
Expand Down Expand Up @@ -169,12 +200,6 @@ fn main() {
.possible_values(WriteLockContention::possible_values())
.help("Accounts that test transactions write lock"),
)
.arg(
Arg::new("iterations")
.long("iterations")
.takes_value(true)
.help("Number of iterations"),
)
.arg(
Arg::new("batches_per_iteration")
.long("batches-per-iteration")
Expand Down Expand Up @@ -205,7 +230,6 @@ fn main() {
.value_of_t::<WriteLockContention>("write_lock_contention")
.unwrap_or(WriteLockContention::None);

let total_num_transactions = num_chunks * packets_per_batch * batches_per_iteration;
let mint_total = 1_000_000_000_000;
let GenesisConfigInfo {
genesis_config,
Expand All @@ -226,55 +250,72 @@ fn main() {
.unwrap()
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);

let mut all_packets: Vec<PacketsPerIteration> = std::iter::from_fn(|| {
Some(PacketsPerIteration::new(
packets_per_batch,
batches_per_iteration,
genesis_config.hash(),
write_lock_contention,
))
})
.take(num_chunks)
.collect();

// fund all the accounts
let total_num_transactions: u64 = all_packets
.iter()
.map(|packets_for_single_iteration| packets_for_single_iteration.transactions.len() as u64)
.sum();
info!(
"threads: {} txs: {}",
num_banking_threads, total_num_transactions
);

let mut transactions = make_accounts_txs(
total_num_transactions,
packets_per_batch,
genesis_config.hash(),
write_lock_contention,
);

// fund all the accounts
transactions.iter().for_each(|tx| {
let mut fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / total_num_transactions as u64,
genesis_config.hash(),
);
// Ignore any pesky duplicate signature errors in the case we are using single-payer
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
fund.signatures = vec![Signature::new(&sig[0..64])];
let x = bank.process_transaction(&fund);
x.unwrap();
all_packets.iter().for_each(|packets_for_single_iteration| {
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let mut fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / total_num_transactions,
genesis_config.hash(),
);
// Ignore any pesky duplicate signature errors in the case we are using single-payer
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
fund.signatures = vec![Signature::new(&sig[0..64])];
bank.process_transaction(&fund).unwrap();
});
});

let skip_sanity = matches.is_present("skip_sanity");
if !skip_sanity {
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute sequentially
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
});
});
bank.clear_signatures();

if write_lock_contention == WriteLockContention::None {
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
}
bank.clear_signatures();
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute in parallel
let res =
bank.process_transactions(packets_for_single_iteration.transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
}
bank.clear_signatures();
});
}
}

let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_batch);
assert_eq!(verified.len(), num_chunks * batches_per_iteration);

let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -306,9 +347,6 @@ fn main() {
);
poh_recorder.lock().unwrap().set_bank(&bank);

let chunk_len = batches_per_iteration;
let mut start = 0;

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
Expand All @@ -319,36 +357,26 @@ fn main() {
let mut txs_processed = 0;
let mut root = 1;
let collector = solana_sdk::pubkey::new_rand();
let config = Config { packets_per_batch };
let mut total_sent = 0;
for _ in 0..iterations {
for current_iteration_index in 0..iterations {
trace!("RUNNING ITERATION {}", current_iteration_index);
let now = Instant::now();
let mut sent = 0;

for (i, v) in verified[start..start + chunk_len].chunks(1).enumerate() {
let mut byte = 0;
let index = config.get_transactions_index(start + i);
if index < transactions.len() {
byte = bytes_as_usize(transactions[index].signatures[0].as_ref());
}
let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks];
for (packet_batch_index, packet_batch) in
packets_for_this_iteration.packet_batches.iter().enumerate()
{
sent += packet_batch.packets.len();
trace!(
"sending... {}..{} {} v.len: {} sig: {} transactions.len: {} index: {}",
start + i,
start + chunk_len,
"Sending PacketBatch index {}, {}",
packet_batch_index,
timestamp(),
v.len(),
byte,
transactions.len(),
index,
);
for xv in v {
sent += xv.packets.len();
}
verified_sender.send(v.to_vec()).unwrap();
verified_sender.send(vec![packet_batch.clone()]).unwrap();
}
let start_tx_index = config.get_transactions_index(start);
let end_tx_index = config.get_transactions_index(start + chunk_len);
for tx in &transactions[start_tx_index..end_tx_index] {

for tx in &packets_for_this_iteration.transactions {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
Expand All @@ -361,7 +389,7 @@ fn main() {
}
if check_txs(
&signal_receiver,
total_num_transactions / num_chunks,
packets_for_this_iteration.transactions.len(),
&poh_recorder,
) {
debug!(
Expand All @@ -370,7 +398,6 @@ fn main() {
bank.transaction_count(),
txs_processed
);
assert!(txs_processed < bank.transaction_count());
txs_processed = bank.transaction_count();
tx_total_us += duration_as_us(&now.elapsed());

Expand Down Expand Up @@ -422,22 +449,17 @@ fn main() {
debug!(
"time: {} us checked: {} sent: {}",
duration_as_us(&now.elapsed()),
total_num_transactions / num_chunks,
total_num_transactions / num_chunks as u64,
sent,
);
total_sent += sent;

if bank.slot() > 0 && bank.slot() % 16 == 0 {
for tx in transactions.iter_mut() {
tx.message.recent_blockhash = bank.last_blockhash();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
if current_iteration_index % 16 == 0 {
let last_blockhash = bank.last_blockhash();
for packets_for_single_iteration in all_packets.iter_mut() {
packets_for_single_iteration.refresh_blockhash(last_blockhash);
}
verified = to_packet_batches(&transactions.clone(), packets_per_batch);
}

start += chunk_len;
start %= verified.len();
}
let txs_processed = bank_forks.working_bank().transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
Expand Down

0 comments on commit e83efe6

Please sign in to comment.