Skip to content

Commit

Permalink
Fix pinning (solana-labs#6604)
Browse files Browse the repository at this point in the history
Remove Deref implementations and add more pass-throughs to the PinnedVec
wrapper.
Warm recyclers
set_pinnable
  • Loading branch information
sakridge authored Nov 8, 2019
1 parent 80a89b5 commit 8e81bc1
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ impl Archiver {
let res = r_reader.recv_timeout(Duration::new(1, 0));
if let Ok(mut packets) = res {
while let Ok(mut more) = r_reader.try_recv() {
packets.packets.append(&mut more.packets);
packets.packets.append_pinned(&mut more.packets);
}
let shreds: Vec<Shred> = packets
.packets
Expand Down
3 changes: 2 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use solana_ledger::{
};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn};
use solana_perf::cuda_runtime::PinnedVec;
use solana_perf::perf_libs;
use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch};
use solana_sdk::{
Expand Down Expand Up @@ -789,7 +790,7 @@ impl BankingStage {
filtered_unprocessed_packet_indexes
}

fn generate_packet_indexes(vers: &[Packet]) -> Vec<usize> {
fn generate_packet_indexes(vers: &PinnedVec<Packet>) -> Vec<usize> {
vers.iter()
.enumerate()
.filter_map(
Expand Down
4 changes: 3 additions & 1 deletion core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET;
use crate::packet::PacketsRecycler;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::service::Service;
Expand Down Expand Up @@ -92,7 +93,8 @@ impl FetchStage {
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self {
let recycler = Recycler::default();
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024);

let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
Expand Down
5 changes: 3 additions & 2 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use crate::packet::Packet;
use crate::packet::{Packet, PacketsRecycler};
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender};
use solana_perf::cuda_runtime::PinnedVec;
Expand Down Expand Up @@ -67,7 +67,8 @@ impl ShredFetchStage {
sender: &PacketSender,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = Recycler::default();
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);

let tvu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
Expand Down
4 changes: 2 additions & 2 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier {
fn default() -> Self {
init();
Self {
recycler: Recycler::default(),
recycler_out: Recycler::default(),
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ impl ShredSigVerifier {
Self {
bank_forks,
leader_schedule_cache,
recycler_offsets: Recycler::default(),
recycler_out: Recycler::default(),
recycler_offsets: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
}
}
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ mod test {
service::Service,
};
use crossbeam_channel::unbounded;
use rand::{seq::SliceRandom, thread_rng};
use rand::thread_rng;
use solana_ledger::shred::DataShredHeader;
use solana_ledger::{
blocktree::{get_tmp_ledger_path, make_many_slot_entries, Blocktree},
Expand Down
32 changes: 18 additions & 14 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,22 @@ fn slot_key_data_for_gpu<
}
}
let mut keyvec = recycler_keys.allocate("shred_gpu_pubkeys");
keyvec.set_pinnable();
let mut slot_to_key_ix = HashMap::new();
for (i, (k, slots)) in keys_to_slots.iter().enumerate() {
keyvec.extend(k.as_ref());

let keyvec_size = keys_to_slots.len() * size_of::<T>();
keyvec.resize(keyvec_size, 0);

for (i, (k, slots)) in keys_to_slots.iter_mut().enumerate() {
let start = i * size_of::<T>();
let end = start + size_of::<T>();
keyvec[start..end].copy_from_slice(k.as_ref());
for s in slots {
slot_to_key_ix.insert(s, i);
}
}
let mut offsets = recycler_offsets.allocate("shred_offsets");
offsets.set_pinnable();
slots.iter().for_each(|packet_slots| {
packet_slots.iter().for_each(|slot| {
offsets
Expand All @@ -145,18 +153,10 @@ fn slot_key_data_for_gpu<
//TODO: GPU needs a more opaque interface, which can handle variable sized structures for data
//Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems
let num_in_packets = (keyvec.len() + (size_of::<Packet>() - 1)) / size_of::<Packet>();
trace!("num_in_packets {}", num_in_packets);
//number of bytes missing
let missing = num_in_packets * size_of::<Packet>() - keyvec.len();
trace!("missing {}", missing);
//extra Pubkeys needed to fill the buffer
let extra = (missing + size_of::<T>() - 1) / size_of::<T>();
trace!("extra {}", extra);
trace!("keyvec {}", keyvec.len());
keyvec.resize(keyvec.len() + extra, 0u8);
trace!("keyvec {}", keyvec.len());
trace!("keyvec {:?}", keyvec);
trace!("offsets {:?}", offsets);
keyvec.resize(num_in_packets * size_of::<Packet>(), 0u8);
trace!("keyvec.len: {}", keyvec.len());
trace!("keyvec: {:?}", keyvec);
trace!("offsets: {:?}", offsets);
(keyvec, offsets, num_in_packets)
}

Expand All @@ -166,8 +166,11 @@ fn shred_gpu_offsets(
recycler_offsets: &Recycler<TxOffset>,
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
let mut signature_offsets = recycler_offsets.allocate("shred_signatures");
signature_offsets.set_pinnable();
let mut msg_start_offsets = recycler_offsets.allocate("shred_msg_starts");
msg_start_offsets.set_pinnable();
let mut msg_sizes = recycler_offsets.allocate("shred_msg_sizes");
msg_sizes.set_pinnable();
let mut v_sig_lens = vec![];
for batch in batches {
let mut sig_lens = Vec::new();
Expand Down Expand Up @@ -402,6 +405,7 @@ pub fn sign_shreds_gpu(
shred_gpu_offsets(offset, batches, recycler_offsets);
let total_sigs = signature_offsets.len();
let mut signatures_out = recycler_out.allocate("ed25519 signatures");
signatures_out.set_pinnable();
signatures_out.resize(total_sigs * sig_size, 0);
elems.push(
perf_libs::Elems {
Expand Down
1 change: 1 addition & 0 deletions perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rayon = "1.2.0"
serde = "1.0.102"
serde_derive = "1.0.102"
dlopen_derive = "0.1.4"
lazy_static = "1.4.0"
log = "0.4.8"
solana-sdk = { path = "../sdk", version = "0.21.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" }
Expand Down
Loading

0 comments on commit 8e81bc1

Please sign in to comment.