Skip to content

Commit

Permalink
Pass Arc<AtomicBool> by value, not by reference. (solana-labs#31916)
Browse files Browse the repository at this point in the history
`Arc` is already a reference internally, so it does not seem to be
beneficial to pass a reference to it.  Just adds an extra layer of
indirection.

Functions that need to be able to increment `Arc` reference count need
to take `Arc<AtomicBool>`, but those that just want to read the
`AtomicBool` value can accept `&AtomicBool`, making them a bit more
generic.

This change focuses specifically on `Arc<AtomicBool>`.  There are other
uses of `&Arc<T>` in the code base that could be converted in a similar
manner.  But it would make the change even larger.
  • Loading branch information
ilya-bobyr authored Jun 2, 2023
1 parent 2b04f28 commit 4353ac6
Show file tree
Hide file tree
Showing 41 changed files with 181 additions and 195 deletions.
15 changes: 7 additions & 8 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,20 @@ where

fn create_sampler_thread<T>(
client: &Arc<T>,
exit_signal: &Arc<AtomicBool>,
exit_signal: Arc<AtomicBool>,
sample_period: u64,
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
) -> JoinHandle<()>
where
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
info!("Sampling TPS every {} second...", sample_period);
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
sample_txs(exit_signal, &maxes, sample_period, &client);
})
.unwrap()
}
Expand Down Expand Up @@ -325,7 +324,7 @@ fn create_sender_threads<T>(
thread_batch_sleep_ms: usize,
total_tx_sent_count: &Arc<AtomicUsize>,
threads: usize,
exit_signal: &Arc<AtomicBool>,
exit_signal: Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
) -> Vec<JoinHandle<()>>
where
Expand Down Expand Up @@ -407,7 +406,7 @@ where
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
let sample_thread = create_sampler_thread(&client, &exit_signal, sample_period, &maxes);
let sample_thread = create_sampler_thread(&client, exit_signal.clone(), sample_period, &maxes);

let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));

Expand Down Expand Up @@ -439,7 +438,7 @@ where
thread_batch_sleep_ms,
&total_tx_sent_count,
threads,
&exit_signal,
exit_signal.clone(),
&shared_tx_active_thread_count,
);

Expand Down Expand Up @@ -786,7 +785,7 @@ fn get_new_latest_blockhash<T: BenchTpsClient + ?Sized>(
}

fn poll_blockhash<T: BenchTpsClient + ?Sized>(
exit_signal: &Arc<AtomicBool>,
exit_signal: &AtomicBool,
blockhash: &Arc<RwLock<Hash>>,
client: &Arc<T>,
id: &Pubkey,
Expand Down Expand Up @@ -836,7 +835,7 @@ fn poll_blockhash<T: BenchTpsClient + ?Sized>(
}

fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
exit_signal: &Arc<AtomicBool>,
exit_signal: &AtomicBool,
shared_txs: &SharedTransactions,
shared_tx_thread_count: &Arc<AtomicIsize>,
total_tx_sent_count: &Arc<AtomicUsize>,
Expand Down
2 changes: 1 addition & 1 deletion bench-tps/src/perf_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct SampleStats {
}

pub fn sample_txs<T>(
exit_signal: &Arc<AtomicBool>,
exit_signal: Arc<AtomicBool>,
sample_stats: &Arc<RwLock<Vec<(String, SampleStats)>>>,
sample_period: u64,
client: &Arc<T>,
Expand Down
12 changes: 6 additions & 6 deletions client-test/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn test_account_subscription() {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Expand Down Expand Up @@ -261,7 +261,7 @@ fn test_block_subscription() {
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
// setup RpcSubscriptions && PubSubService
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore.clone(),
Expand Down Expand Up @@ -348,7 +348,7 @@ fn test_program_subscription() {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Expand Down Expand Up @@ -434,7 +434,7 @@ fn test_root_subscription() {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks.clone(),
Expand Down Expand Up @@ -485,7 +485,7 @@ fn test_slot_subscription() {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Expand Down Expand Up @@ -561,7 +561,7 @@ async fn test_slot_subscription_async() {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Expand Down
5 changes: 2 additions & 3 deletions client/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl TransactionExecutor {
let sigs = Arc::new(RwLock::new(Vec::new()));
let cleared = Arc::new(RwLock::new(Vec::new()));
let exit = Arc::new(AtomicBool::new(false));
let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, &client);
let sig_clear_t = Self::start_sig_clear_thread(exit.clone(), &sigs, &cleared, &client);
Self {
sigs,
cleared,
Expand Down Expand Up @@ -96,13 +96,12 @@ impl TransactionExecutor {
}

fn start_sig_clear_thread(
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
sigs: &Arc<RwLock<PendingQueue>>,
cleared: &Arc<RwLock<Vec<u64>>>,
client: &Arc<RpcClient>,
) -> JoinHandle<()> {
let sigs = sigs.clone();
let exit = exit.clone();
let cleared = cleared.clone();
let client = client.clone();
Builder::new()
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,7 @@ mod tests {
None,
blockstore.clone(),
false,
&Arc::new(AtomicBool::new(false)),
Arc::new(AtomicBool::new(false)),
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
Expand Down Expand Up @@ -1749,7 +1749,7 @@ mod tests {
None,
blockstore.clone(),
false,
&Arc::new(AtomicBool::new(false)),
Arc::new(AtomicBool::new(false)),
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
Expand Down
3 changes: 1 addition & 2 deletions core/src/cache_block_meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ impl CacheBlockMetaService {
pub fn new(
cache_block_meta_receiver: CacheBlockMetaReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solCacheBlkTime".to_string())
.spawn(move || loop {
Expand Down
30 changes: 16 additions & 14 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,20 @@ impl ClusterInfoVoteListener {
})
.unwrap()
};
let exit_ = exit.clone();
let bank_send_thread = Builder::new()
.name("solCiBankSend".to_string())
.spawn(move || {
let _ = Self::bank_send_loop(
exit_,
verified_vote_label_packets_receiver,
poh_recorder,
&verified_packets_sender,
);
})
.unwrap();
let bank_send_thread = {
let exit = exit.clone();
Builder::new()
.name("solCiBankSend".to_string())
.spawn(move || {
let _ = Self::bank_send_loop(
exit,
verified_vote_label_packets_receiver,
poh_recorder,
&verified_packets_sender,
);
})
.unwrap()
};

let send_thread = Builder::new()
.name("solCiProcVotes".to_string())
Expand Down Expand Up @@ -1447,7 +1449,7 @@ mod tests {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Expand Down Expand Up @@ -1563,7 +1565,7 @@ mod tests {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
bank_forks,
Expand Down
9 changes: 4 additions & 5 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,26 @@ pub struct AggregateCommitmentService {

impl AggregateCommitmentService {
pub fn new(
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
subscriptions: Arc<RpcSubscriptions>,
) -> (Sender<CommitmentAggregationData>, Self) {
let (sender, receiver): (
Sender<CommitmentAggregationData>,
Receiver<CommitmentAggregationData>,
) = unbounded();
let exit_ = exit.clone();
(
sender,
Self {
t_commitment: Builder::new()
.name("solAggCommitSvc".to_string())
.spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
if exit.load(Ordering::Relaxed) {
break;
}

if let Err(RecvTimeoutError::Disconnected) =
Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_)
Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit)
{
break;
}
Expand All @@ -90,7 +89,7 @@ impl AggregateCommitmentService {
receiver: &Receiver<CommitmentAggregationData>,
block_commitment_cache: &RwLock<BlockCommitmentCache>,
subscriptions: &Arc<RpcSubscriptions>,
exit: &Arc<AtomicBool>,
exit: &AtomicBool,
) -> Result<(), RecvTimeoutError> {
loop {
if exit.load(Ordering::Relaxed) {
Expand Down
3 changes: 1 addition & 2 deletions core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ impl CompletedDataSetsService {
completed_sets_receiver: CompletedDataSetsReceiver,
blockstore: Arc<Blockstore>,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
max_slots: Arc<MaxSlots>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solComplDataSet".to_string())
.spawn(move || loop {
Expand Down
7 changes: 3 additions & 4 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl FetchStage {
sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce: Duration,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl FetchStage {
sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl FetchStage {
tpu_sockets: Vec<Arc<UdpSocket>>,
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
Expand Down Expand Up @@ -234,7 +234,6 @@ impl FetchStage {
})
.unwrap();

let exit = exit.clone();
let metrics_thread_hdl = Builder::new()
.name("solFetchStgMetr".to_string())
.spawn(move || loop {
Expand Down
3 changes: 1 addition & 2 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ impl LedgerCleanupService {
new_root_receiver: Receiver<Slot>,
blockstore: Arc<Blockstore>,
max_ledger_shreds: u64,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let mut last_purge_slot = 0;

info!(
Expand Down
5 changes: 2 additions & 3 deletions core/src/ledger_metric_report_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ pub struct LedgerMetricReportService {
}

impl LedgerMetricReportService {
pub fn new(blockstore: Arc<Blockstore>, exit: &Arc<AtomicBool>) -> Self {
let exit_signal = exit.clone();
pub fn new(blockstore: Arc<Blockstore>, exit: Arc<AtomicBool>) -> Self {
let t_cf_metric = Builder::new()
.name("solRocksCfMtrcs".to_string())
.spawn(move || loop {
if exit_signal.load(Ordering::Relaxed) {
if exit.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(
Expand Down
7 changes: 3 additions & 4 deletions core/src/poh_timing_report_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ pub struct PohTimingReportService {
}

impl PohTimingReportService {
pub fn new(receiver: PohTimingReceiver, exit: &Arc<AtomicBool>) -> Self {
let exit_signal = exit.clone();
pub fn new(receiver: PohTimingReceiver, exit: Arc<AtomicBool>) -> Self {
let mut poh_timing_reporter = PohTimingReporter::default();
let t_poh_timing = Builder::new()
.name("solPohTimingRpt".to_string())
.spawn(move || loop {
if exit_signal.load(Ordering::Relaxed) {
if exit.load(Ordering::Relaxed) {
break;
}
if let Ok(SlotPohTimingInfo {
Expand Down Expand Up @@ -65,7 +64,7 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
// Create the service
let poh_timing_report_service =
PohTimingReportService::new(poh_timing_point_receiver, &exit);
PohTimingReportService::new(poh_timing_point_receiver, exit.clone());

// Send SlotPohTimingPoint
let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_start_poh_time_point(
Expand Down
Loading

0 comments on commit 4353ac6

Please sign in to comment.