Skip to content

Commit

Permalink
Remove redundant threadpools in sigverify (solana-labs#7888)
Browse files Browse the repository at this point in the history
* Limit the number of thread pools sigverify creates

* Name local threadpools
  • Loading branch information
sagar-solana authored Jan 21, 2020
1 parent 1fe11e9 commit 2dd8ab1
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ sys-info = "0.5.8"
tar = "0.4.26"
thiserror = "1.0"
tempfile = "3.1.0"
lazy_static = "1.4.0"

[dependencies.rocksdb]
# Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts
Expand Down
1 change: 1 addition & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb";

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_{}", ix))
.build()
.unwrap()));

Expand Down
1 change: 1 addition & 0 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use thiserror::Error;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix))
.build()
.unwrap())
);
Expand Down
1 change: 1 addition & 0 deletions ledger/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{cmp, thread};

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("entry_{}", ix))
.build()
.unwrap()));

Expand Down
3 changes: 3 additions & 0 deletions ledger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ extern crate solana_metrics;

#[macro_use]
extern crate log;

#[macro_use]
extern crate lazy_static;
1 change: 1 addition & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SL

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("shredder_{}", ix))
.build()
.unwrap()));

Expand Down
132 changes: 63 additions & 69 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
use std::{cell::RefCell, collections::HashMap, mem::size_of};
use std::{collections::HashMap, mem::size_of};

pub const SIGN_SHRED_GPU_MIN: usize = 256;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("sigverify_shreds_{}", ix))
.build()
.unwrap()));
lazy_static! {
pub static ref SIGVERIFY_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("sigverify_shreds_{}", ix))
.build()
.unwrap();
}

/// Assuming layout is
/// signature: Signature
Expand Down Expand Up @@ -70,18 +72,16 @@ fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap<u64, [u8; 32]>)
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count);
let rv = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.par_iter()
.map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0))
.collect()
})
.collect()
})
let rv = SIGVERIFY_THREAD_POOL.install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.par_iter()
.map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0))
.collect()
})
.collect()
});
inc_new_counter_debug!("ed25519_shred_verify_cpu", count);
rv
Expand All @@ -97,30 +97,28 @@ fn slot_key_data_for_gpu<
) -> (PinnedVec<u8>, TxOffset, usize) {
//TODO: mark Pubkey::default shreds as failed after the GPU returns
assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default()));
let slots: Vec<Vec<u64>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.iter()
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end || packet.meta.discard {
return std::u64::MAX;
}
let slot: Option<u64> =
limited_deserialize(&packet.data[slot_start..slot_end]).ok();
match slot {
Some(slot) if slot_keys.get(&slot).is_some() => slot,
_ => std::u64::MAX,
}
})
.collect()
})
.collect()
})
let slots: Vec<Vec<u64>> = SIGVERIFY_THREAD_POOL.install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.iter()
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end || packet.meta.discard {
return std::u64::MAX;
}
let slot: Option<u64> =
limited_deserialize(&packet.data[slot_start..slot_end]).ok();
match slot {
Some(slot) if slot_keys.get(&slot).is_some() => slot,
_ => std::u64::MAX,
}
})
.collect()
})
.collect()
});
let mut keys_to_slots: HashMap<T, Vec<u64>> = HashMap::new();
for batch in slots.iter() {
Expand Down Expand Up @@ -312,14 +310,12 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count);
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches.par_iter_mut().for_each(|p| {
p.packets[..]
.par_iter_mut()
.for_each(|mut p| sign_shred_cpu(keypair, &mut p));
});
})
SIGVERIFY_THREAD_POOL.install(|| {
batches.par_iter_mut().for_each(|p| {
p.packets[..]
.par_iter_mut()
.for_each(|mut p| sign_shred_cpu(keypair, &mut p));
});
});
inc_new_counter_debug!("ed25519_shred_verify_cpu", count);
}
Expand Down Expand Up @@ -425,25 +421,23 @@ pub fn sign_shreds_gpu(
}
sizes[i] += sizes[i - 1];
}
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.par_iter_mut()
.enumerate()
.for_each(|(batch_ix, batch)| {
let num_packets = sizes[batch_ix];
batch.packets[..]
.par_iter_mut()
.enumerate()
.for_each(|(packet_ix, packet)| {
let sig_ix = packet_ix + num_packets;
let sig_start = sig_ix * sig_size;
let sig_end = sig_start + sig_size;
packet.data[0..sig_size]
.copy_from_slice(&signatures_out[sig_start..sig_end]);
});
});
});
SIGVERIFY_THREAD_POOL.install(|| {
batches
.par_iter_mut()
.enumerate()
.for_each(|(batch_ix, batch)| {
let num_packets = sizes[batch_ix];
batch.packets[..]
.par_iter_mut()
.enumerate()
.for_each(|(packet_ix, packet)| {
let sig_ix = packet_ix + num_packets;
let sig_start = sig_ix * sig_size;
let sig_end = sig_start + sig_size;
packet.data[0..sig_size]
.copy_from_slice(&signatures_out[sig_start..sig_end]);
});
});
});
inc_new_counter_debug!("ed25519_shred_sign_gpu", count);
}
Expand Down
25 changes: 12 additions & 13 deletions perf/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use solana_sdk::short_vec::decode_len;
use solana_sdk::signature::Signature;
#[cfg(test)]
use solana_sdk::transaction::Transaction;
use std::cell::RefCell;
use std::mem::size_of;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("sigverify_{}", ix))
.build()
.unwrap()));
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("sigverify_{}", ix))
.build()
.unwrap();
}

pub type TxOffset = PinnedVec<u32>;

Expand Down Expand Up @@ -247,13 +248,11 @@ pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU ECDSA for {}", batch_size(batches));
let rv = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.into_par_iter()
.map(|p| p.packets.par_iter().map(verify_packet).collect())
.collect()
})
let rv = PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.map(|p| p.packets.par_iter().map(verify_packet).collect())
.collect()
});
inc_new_counter_debug!("ed25519_verify_cpu", count);
rv
Expand Down

0 comments on commit 2dd8ab1

Please sign in to comment.