diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index fa13ce961f63ac..e73a57b3db4cdc 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -248,7 +248,7 @@ where fn create_sampler_thread( client: &Arc, - exit_signal: &Arc, + exit_signal: Arc, sample_period: u64, maxes: &Arc>>, ) -> JoinHandle<()> @@ -256,13 +256,12 @@ where T: 'static + BenchTpsClient + Send + Sync + ?Sized, { info!("Sampling TPS every {} second...", sample_period); - let exit_signal = exit_signal.clone(); let maxes = maxes.clone(); let client = client.clone(); Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_txs(&exit_signal, &maxes, sample_period, &client); + sample_txs(exit_signal, &maxes, sample_period, &client); }) .unwrap() } @@ -325,7 +324,7 @@ fn create_sender_threads( thread_batch_sleep_ms: usize, total_tx_sent_count: &Arc, threads: usize, - exit_signal: &Arc, + exit_signal: Arc, shared_tx_active_thread_count: &Arc, ) -> Vec> where @@ -407,7 +406,7 @@ where // collect the max transaction rate and total tx count seen let maxes = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds - let sample_thread = create_sampler_thread(&client, &exit_signal, sample_period, &maxes); + let sample_thread = create_sampler_thread(&client, exit_signal.clone(), sample_period, &maxes); let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); @@ -439,7 +438,7 @@ where thread_batch_sleep_ms, &total_tx_sent_count, threads, - &exit_signal, + exit_signal.clone(), &shared_tx_active_thread_count, ); @@ -786,7 +785,7 @@ fn get_new_latest_blockhash( } fn poll_blockhash( - exit_signal: &Arc, + exit_signal: &AtomicBool, blockhash: &Arc>, client: &Arc, id: &Pubkey, @@ -836,7 +835,7 @@ fn poll_blockhash( } fn do_tx_transfers( - exit_signal: &Arc, + exit_signal: &AtomicBool, shared_txs: &SharedTransactions, shared_tx_thread_count: &Arc, total_tx_sent_count: &Arc, diff --git a/bench-tps/src/perf_utils.rs b/bench-tps/src/perf_utils.rs index 5e80695cd5a123..49399723cf0fb1 100644 --- a/bench-tps/src/perf_utils.rs +++ b/bench-tps/src/perf_utils.rs @@ -23,7 +23,7 @@ pub struct SampleStats { } pub fn sample_txs( - exit_signal: &Arc, + exit_signal: Arc, sample_stats: &Arc>>, sample_period: u64, client: &Arc, diff --git a/client-test/tests/client.rs b/client-test/tests/client.rs index 91650bf6c88e16..8d06885e722856 100644 --- a/client-test/tests/client.rs +++ b/client-test/tests/client.rs @@ -142,7 +142,7 @@ fn test_account_subscription() { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -261,7 +261,7 @@ fn test_block_subscription() { let max_complete_rewards_slot = Arc::new(AtomicU64::default()); // setup RpcSubscriptions && PubSubService let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore.clone(), @@ -348,7 +348,7 @@ fn test_program_subscription() { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -434,7 +434,7 @@ fn test_root_subscription() { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -485,7 +485,7 @@ fn test_slot_subscription() { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -561,7 +561,7 @@ async fn test_slot_subscription_async() { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, diff --git a/client/src/transaction_executor.rs b/client/src/transaction_executor.rs index 512ed43bb5acf6..1dd9e860cad8b0 100644 --- a/client/src/transaction_executor.rs +++ b/client/src/transaction_executor.rs @@ -51,7 +51,7 @@ impl TransactionExecutor { let sigs = Arc::new(RwLock::new(Vec::new())); let cleared = Arc::new(RwLock::new(Vec::new())); let exit = Arc::new(AtomicBool::new(false)); - let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, &client); + let sig_clear_t = Self::start_sig_clear_thread(exit.clone(), &sigs, &cleared, &client); Self { sigs, cleared, @@ -96,13 +96,12 @@ impl TransactionExecutor { } fn start_sig_clear_thread( - exit: &Arc, + exit: Arc, sigs: &Arc>, cleared: &Arc>>, client: &Arc, ) -> JoinHandle<()> { let sigs = sigs.clone(); - let exit = exit.clone(); let cleared = cleared.clone(); let client = client.clone(); Builder::new() diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index ba58bd48d07124..ee073fc7710af4 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -1611,7 +1611,7 @@ mod tests { None, blockstore.clone(), false, - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -1749,7 +1749,7 @@ mod tests { None, blockstore.clone(), false, - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); diff --git a/core/src/cache_block_meta_service.rs b/core/src/cache_block_meta_service.rs index 507149b13dab39..b868352040546e 100644 --- a/core/src/cache_block_meta_service.rs +++ b/core/src/cache_block_meta_service.rs @@ -26,9 +26,8 @@ impl CacheBlockMetaService { pub fn new( cache_block_meta_receiver: CacheBlockMetaReceiver, blockstore: Arc, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let thread_hdl = Builder::new() .name("solCacheBlkTime".to_string()) .spawn(move || loop { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d61d5b60cd284c..578986dcd456df 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -264,18 +264,20 @@ impl ClusterInfoVoteListener { }) .unwrap() }; - let exit_ = exit.clone(); - let bank_send_thread = Builder::new() - .name("solCiBankSend".to_string()) - .spawn(move || { - let _ = Self::bank_send_loop( - exit_, - verified_vote_label_packets_receiver, - poh_recorder, - &verified_packets_sender, - ); - }) - .unwrap(); + let bank_send_thread = { + let exit = exit.clone(); + Builder::new() + .name("solCiBankSend".to_string()) + .spawn(move || { + let _ = Self::bank_send_loop( + exit, + verified_vote_label_packets_receiver, + poh_recorder, + &verified_packets_sender, + ); + }) + .unwrap() + }; let send_thread = Builder::new() .name("solCiProcVotes".to_string()) @@ -1447,7 +1449,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -1563,7 +1565,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index 2cfc20146d03b3..11e096eef9d203 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -56,7 +56,7 @@ pub struct AggregateCommitmentService { impl AggregateCommitmentService { pub fn new( - exit: &Arc, + exit: Arc, block_commitment_cache: Arc>, subscriptions: Arc, ) -> (Sender, Self) { @@ -64,19 +64,18 @@ impl AggregateCommitmentService { Sender, Receiver, ) = unbounded(); - let exit_ = exit.clone(); ( sender, Self { t_commitment: Builder::new() .name("solAggCommitSvc".to_string()) .spawn(move || loop { - if exit_.load(Ordering::Relaxed) { + if exit.load(Ordering::Relaxed) { break; } if let Err(RecvTimeoutError::Disconnected) = - Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_) + Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit) { break; } @@ -90,7 +89,7 @@ impl AggregateCommitmentService { receiver: &Receiver, block_commitment_cache: &RwLock, subscriptions: &Arc, - exit: &Arc, + exit: &AtomicBool, ) -> Result<(), RecvTimeoutError> { loop { if exit.load(Ordering::Relaxed) { diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index dba259b7889902..3d7dd43b748b6d 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -32,10 +32,9 @@ impl CompletedDataSetsService { completed_sets_receiver: CompletedDataSetsReceiver, blockstore: Arc, rpc_subscriptions: Arc, - exit: &Arc, + exit: Arc, max_slots: Arc, ) -> Self { - let exit = exit.clone(); let thread_hdl = Builder::new() .name("solComplDataSet".to_string()) .spawn(move || loop { diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 196770a1eafda0..b3eb36201fc637 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -34,7 +34,7 @@ impl FetchStage { sockets: Vec, tpu_forwards_sockets: Vec, tpu_vote_sockets: Vec, - exit: &Arc, + exit: Arc, poh_recorder: &Arc>, coalesce: Duration, ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { @@ -66,7 +66,7 @@ impl FetchStage { sockets: Vec, tpu_forwards_sockets: Vec, tpu_vote_sockets: Vec, - exit: &Arc, + exit: Arc, sender: &PacketBatchSender, vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, @@ -142,7 +142,7 @@ impl FetchStage { tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, - exit: &Arc, + exit: Arc, sender: &PacketBatchSender, vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, @@ -234,7 +234,6 @@ impl FetchStage { }) .unwrap(); - let exit = exit.clone(); let metrics_thread_hdl = Builder::new() .name("solFetchStgMetr".to_string()) .spawn(move || loop { diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 4f9deb14b2a72b..f60967e01663e3 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -49,9 +49,8 @@ impl LedgerCleanupService { new_root_receiver: Receiver, blockstore: Arc, max_ledger_shreds: u64, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let mut last_purge_slot = 0; info!( diff --git a/core/src/ledger_metric_report_service.rs b/core/src/ledger_metric_report_service.rs index 1f8636bff6a546..2e91013eb991b8 100644 --- a/core/src/ledger_metric_report_service.rs +++ b/core/src/ledger_metric_report_service.rs @@ -23,12 +23,11 @@ pub struct LedgerMetricReportService { } impl LedgerMetricReportService { - pub fn new(blockstore: Arc, exit: &Arc) -> Self { - let exit_signal = exit.clone(); + pub fn new(blockstore: Arc, exit: Arc) -> Self { let t_cf_metric = Builder::new() .name("solRocksCfMtrcs".to_string()) .spawn(move || loop { - if exit_signal.load(Ordering::Relaxed) { + if exit.load(Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis( diff --git a/core/src/poh_timing_report_service.rs b/core/src/poh_timing_report_service.rs index bc84176525ef6e..6b4d7573cab1be 100644 --- a/core/src/poh_timing_report_service.rs +++ b/core/src/poh_timing_report_service.rs @@ -24,13 +24,12 @@ pub struct PohTimingReportService { } impl PohTimingReportService { - pub fn new(receiver: PohTimingReceiver, exit: &Arc) -> Self { - let exit_signal = exit.clone(); + pub fn new(receiver: PohTimingReceiver, exit: Arc) -> Self { let mut poh_timing_reporter = PohTimingReporter::default(); let t_poh_timing = Builder::new() .name("solPohTimingRpt".to_string()) .spawn(move || loop { - if exit_signal.load(Ordering::Relaxed) { + if exit.load(Ordering::Relaxed) { break; } if let Ok(SlotPohTimingInfo { @@ -65,7 +64,7 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); // Create the service let poh_timing_report_service = - PohTimingReportService::new(poh_timing_point_receiver, &exit); + PohTimingReportService::new(poh_timing_point_receiver, exit.clone()); // Send SlotPohTimingPoint let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_start_poh_time_point( diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index ab7decc4b0722c..e9c7d29527b319 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -536,7 +536,7 @@ impl ReplayStage { trace!("replay stage"); // Start the replay stage loop let (lockouts_sender, commitment_service) = AggregateCommitmentService::new( - &exit, + exit.clone(), block_commitment_cache.clone(), rpc_subscriptions.clone(), ); @@ -3971,7 +3971,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -4541,7 +4541,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -4614,7 +4614,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -4622,7 +4622,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )); let (lockouts_sender, _) = AggregateCommitmentService::new( - &exit, + exit, block_commitment_cache.clone(), rpc_subscriptions, ); diff --git a/core/src/rewards_recorder_service.rs b/core/src/rewards_recorder_service.rs index 092b6eae1ad7d6..dda6ec3513256f 100644 --- a/core/src/rewards_recorder_service.rs +++ b/core/src/rewards_recorder_service.rs @@ -32,9 +32,8 @@ impl RewardsRecorderService { rewards_receiver: RewardsRecorderReceiver, max_complete_rewards_slot: Arc, blockstore: Arc, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let thread_hdl = Builder::new() .name("solRewardsWritr".to_string()) .spawn(move || loop { diff --git a/core/src/sample_performance_service.rs b/core/src/sample_performance_service.rs index a5395d78ea1c7c..05e6ee36622442 100644 --- a/core/src/sample_performance_service.rs +++ b/core/src/sample_performance_service.rs @@ -22,9 +22,8 @@ impl SamplePerformanceService { pub fn new( bank_forks: &Arc>, blockstore: &Arc, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let blockstore = blockstore.clone(); let bank_forks = bank_forks.clone(); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 95383f61e71129..e515d86533b1e3 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -114,7 +114,7 @@ impl ShredFetchStage { #[allow(clippy::too_many_arguments)] fn packet_modifier( sockets: Vec>, - exit: &Arc, + exit: Arc, sender: Sender, recycler: PacketBatchRecycler, bank_forks: Arc>, @@ -170,13 +170,13 @@ impl ShredFetchStage { bank_forks: Arc>, cluster_info: Arc, turbine_disabled: Arc, - exit: &Arc, + exit: Arc, ) -> Self { let recycler = PacketBatchRecycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, - exit, + exit.clone(), sender.clone(), recycler.clone(), bank_forks.clone(), @@ -189,7 +189,7 @@ impl ShredFetchStage { let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier( forward_sockets, - exit, + exit.clone(), sender.clone(), recycler.clone(), bank_forks.clone(), diff --git a/core/src/stats_reporter_service.rs b/core/src/stats_reporter_service.rs index 90e72aaadb3e46..b192db1495d572 100644 --- a/core/src/stats_reporter_service.rs +++ b/core/src/stats_reporter_service.rs @@ -18,9 +18,8 @@ pub struct StatsReporterService { impl StatsReporterService { pub fn new( reporting_receiver: Receiver>, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let thread_hdl = Builder::new() .name("solStatsReport".to_owned()) .spawn(move || loop { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4d6beef5a32732..75bd7832dbbc22 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -87,7 +87,7 @@ impl Tpu { _entry_notification_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, - exit: &Arc, + exit: Arc, shred_version: u16, vote_tracker: Arc, bank_forks: Arc>, @@ -125,7 +125,7 @@ impl Tpu { transactions_sockets, tpu_forwards_sockets, tpu_vote_sockets, - exit, + exit.clone(), &packet_sender, &vote_packet_sender, &forwarded_packet_sender, @@ -234,7 +234,7 @@ impl Tpu { cluster_info.clone(), entry_receiver, retransmit_slots_receiver, - exit.clone(), + exit, blockstore.clone(), bank_forks, shred_version, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index a5a7fba4511cc4..6575f4b4ec9256 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -114,7 +114,7 @@ impl Tvu { maybe_process_block_store: Option, tower_storage: Arc, leader_schedule_cache: &Arc, - exit: &Arc, + exit: Arc, block_commitment_cache: Arc>, turbine_disabled: Arc, transaction_status_sender: Option, @@ -163,7 +163,7 @@ impl Tvu { bank_forks.clone(), cluster_info.clone(), turbine_disabled, - exit, + exit.clone(), ); let (verified_sender, verified_receiver) = unbounded(); @@ -313,12 +313,12 @@ impl Tvu { ledger_cleanup_slot_receiver, blockstore.clone(), max_ledger_shreds, - exit, + exit.clone(), ) }); let duplicate_shred_listener = DuplicateShredListener::new( - exit.clone(), + exit, cluster_info.clone(), DuplicateShredHandler::new( blockstore, @@ -448,7 +448,7 @@ pub mod tests { blockstore, ledger_signal_receiver, &Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit.clone(), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -459,7 +459,7 @@ pub mod tests { None, Arc::new(crate::tower_storage::FileTowerStorage::default()), &leader_schedule_cache, - &exit, + exit.clone(), block_commitment_cache, Arc::::default(), None, diff --git a/core/src/validator.rs b/core/src/validator.rs index b7c7c1e69369f4..cb82e1c4a44de0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -384,13 +384,12 @@ struct BlockstoreRootScan { } impl BlockstoreRootScan { - fn new(config: &ValidatorConfig, blockstore: &Arc, exit: &Arc) -> Self { + fn new(config: &ValidatorConfig, blockstore: &Arc, exit: Arc) -> Self { let thread = if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history && config.rpc_config.rpc_scan_and_fix_roots { let blockstore = blockstore.clone(); - let exit = exit.clone(); Some( Builder::new() .name("solBStoreRtScan".to_string()) @@ -616,7 +615,7 @@ impl Validator { ); let system_monitor_service = Some(SystemMonitorService::new( - Arc::clone(&exit), + exit.clone(), SystemMonitorStatsReportConfig { report_os_memory_stats: !config.no_os_memory_stats_reporting, report_os_network_stats: !config.no_os_network_stats_reporting, @@ -627,7 +626,7 @@ impl Validator { let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded(); let poh_timing_report_service = - PohTimingReportService::new(poh_timing_point_receiver, &exit); + PohTimingReportService::new(poh_timing_point_receiver, exit.clone()); let ( genesis_config, @@ -655,7 +654,7 @@ impl Validator { ) = load_blockstore( config, ledger_path, - &exit, + exit.clone(), &start_progress, accounts_update_notifier, transaction_notifier, @@ -811,7 +810,7 @@ impl Validator { Some(SamplePerformanceService::new( &bank_forks, &blockstore, - &exit, + exit.clone(), )) } else { None @@ -830,7 +829,7 @@ impl Validator { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config( - &exit, + exit.clone(), max_complete_transaction_status_slot.clone(), max_complete_rewards_slot.clone(), blockstore.clone(), @@ -848,7 +847,7 @@ impl Validator { completed_data_sets_receiver, blockstore.clone(), rpc_subscriptions.clone(), - &exit, + exit.clone(), max_slots.clone(), ); @@ -972,7 +971,7 @@ impl Validator { }, Some(OptimisticallyConfirmedBankTracker::new( bank_notification_receiver, - &exit, + exit.clone(), bank_forks.clone(), optimistically_confirmed_bank, rpc_subscriptions.clone(), @@ -1010,7 +1009,8 @@ impl Validator { let (stats_reporter_sender, stats_reporter_receiver) = unbounded(); - let stats_reporter_service = StatsReporterService::new(stats_reporter_receiver, &exit); + let stats_reporter_service = + StatsReporterService::new(stats_reporter_receiver, exit.clone()); let gossip_service = GossipService::new( &cluster_info, @@ -1019,7 +1019,7 @@ impl Validator { config.gossip_validators.clone(), should_check_duplicate_instance, Some(stats_reporter_sender.clone()), - &exit, + exit.clone(), ); let serve_repair = ServeRepair::new( cluster_info.clone(), @@ -1055,7 +1055,7 @@ impl Validator { }; let ledger_metric_report_service = - LedgerMetricReportService::new(blockstore.clone(), &exit); + LedgerMetricReportService::new(blockstore.clone(), exit.clone()); let wait_for_vote_to_start_leader = !waited_for_supermajority && !config.no_wait_for_vote_to_start_leader; @@ -1063,7 +1063,7 @@ impl Validator { let poh_service = PohService::new( poh_recorder.clone(), &genesis_config.poh_config, - &exit, + exit.clone(), bank_forks.read().unwrap().root_bank().ticks_per_slot(), config.poh_pinned_cpu_core, config.poh_hashes_per_batch, @@ -1127,7 +1127,7 @@ impl Validator { Some(process_blockstore), config.tower_storage.clone(), &leader_schedule_cache, - &exit, + exit.clone(), block_commitment_cache, config.turbine_disabled.clone(), transaction_status_sender.clone(), @@ -1178,7 +1178,7 @@ impl Validator { entry_notification_sender, &blockstore, &config.broadcast_stage_type, - &exit, + exit, node.info.shred_version(), vote_tracker, bank_forks.clone(), @@ -1527,7 +1527,7 @@ fn blockstore_options_from_config(config: &ValidatorConfig) -> BlockstoreOptions fn load_blockstore( config: &ValidatorConfig, ledger_path: &Path, - exit: &Arc, + exit: Arc, start_progress: &Arc>, accounts_update_notifier: Option, transaction_notifier: Option, @@ -1590,7 +1590,7 @@ fn load_blockstore( let original_blockstore_root = blockstore.last_root(); let blockstore = Arc::new(blockstore); - let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit); + let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit.clone()); let halt_at_slot = config .halt_at_slot .or_else(|| blockstore.highest_slot().unwrap_or(None)); @@ -1616,7 +1616,7 @@ fn load_blockstore( if enable_rpc_transaction_history || is_plugin_transaction_history_required { initialize_rpc_transaction_history_services( blockstore.clone(), - exit, + exit.clone(), enable_rpc_transaction_history, config.rpc_config.enable_extended_tx_metadata_storage, transaction_notifier, @@ -1625,8 +1625,8 @@ fn load_blockstore( TransactionHistoryServices::default() }; - let entry_notifier_service = - entry_notifier.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit)); + let entry_notifier_service = entry_notifier + .map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone())); let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( @@ -1643,7 +1643,7 @@ fn load_blockstore( .as_ref() .map(|service| service.sender()), accounts_update_notifier, - exit.clone(), + exit, ); // Before replay starts, set the callbacks in each of the banks in BankForks so that @@ -2002,7 +2002,7 @@ fn backup_and_clear_blockstore( fn initialize_rpc_transaction_history_services( blockstore: Arc, - exit: &Arc, + exit: Arc, enable_rpc_transaction_history: bool, enable_extended_tx_metadata_storage: bool, transaction_notifier: Option, @@ -2019,7 +2019,7 @@ fn initialize_rpc_transaction_history_services( transaction_notifier, blockstore.clone(), enable_extended_tx_metadata_storage, - exit, + exit.clone(), )); let max_complete_rewards_slot = Arc::new(AtomicU64::new(blockstore.max_root())); @@ -2029,7 +2029,7 @@ fn initialize_rpc_transaction_history_services( rewards_receiver, max_complete_rewards_slot.clone(), blockstore.clone(), - exit, + exit.clone(), )); let (cache_block_meta_sender, cache_block_meta_receiver) = unbounded(); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index bad78c6ad9d5a1..0d325bd31bb4f4 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -116,8 +116,7 @@ mod tests { } impl CpuStatsUpdater { - pub fn new(exit: &Arc) -> Self { - let exit = exit.clone(); + pub fn new(exit: Arc) -> Self { let cpu_stats = Arc::new(CpuStats::default()); let cpu_stats_clone = cpu_stats.clone(); @@ -370,14 +369,14 @@ mod tests { receiver, blockstore.clone(), max_ledger_shreds, - &exit, + exit.clone(), )) } else { None }; let exit_cpu = Arc::new(AtomicBool::new(false)); - let sys = CpuStatsUpdater::new(&exit_cpu); + let sys = CpuStatsUpdater::new(exit_cpu.clone()); let mut shreds = VecDeque::new(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 4cf1cb54110729..fb2cf68aeb2e5c 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -39,7 +39,7 @@ impl GossipService { gossip_validators: Option>, should_check_duplicate_instance: bool, stats_reporter_sender: Option>>, - exit: &Arc, + exit: Arc, ) -> Self { let (request_sender, request_receiver) = unbounded(); let gossip_socket = Arc::new(gossip_socket); @@ -73,12 +73,10 @@ impl GossipService { should_check_duplicate_instance, exit.clone(), ); - let t_gossip = cluster_info.clone().gossip( - bank_forks, - response_sender, - gossip_validators, - exit.clone(), - ); + let t_gossip = + cluster_info + .clone() + .gossip(bank_forks, response_sender, gossip_validators, exit); let t_responder = streamer::responder( "Gossip", gossip_socket, @@ -144,7 +142,7 @@ pub fn discover( let (gossip_service, ip_echo, spy_ref) = make_gossip_node( keypair, entrypoint, - &exit, + exit.clone(), my_gossip_addr, my_shred_version, true, // should_check_duplicate_instance, @@ -302,7 +300,7 @@ fn spy( pub fn make_gossip_node( keypair: Keypair, entrypoint: Option<&SocketAddr>, - exit: &Arc, + exit: Arc, gossip_addr: Option<&SocketAddr>, shred_version: u16, should_check_duplicate_instance: bool, @@ -360,7 +358,7 @@ mod tests { None, true, // should_check_duplicate_instance None, - &exit, + exit.clone(), ); exit.store(true, Ordering::Relaxed); d.join().unwrap(); diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index aab7bdee6bb9f8..ac9acd20f6c21e 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -35,7 +35,7 @@ use { }, }; -fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSocket) { +fn test_node(exit: Arc) -> (Arc, GossipService, UdpSocket) { let keypair = Arc::new(Keypair::new()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let cluster_info = Arc::new(ClusterInfo::new( @@ -62,7 +62,7 @@ fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSoc fn test_node_with_bank( node_keypair: Arc, - exit: &Arc, + exit: Arc, bank_forks: Arc>, ) -> (Arc, GossipService, UdpSocket) { let mut test_node = Node::new_localhost_with_pubkey(&node_keypair.pubkey()); @@ -97,7 +97,7 @@ where F: Fn(&Vec<(Arc, GossipService, UdpSocket)>), { let exit = Arc::new(AtomicBool::new(false)); - let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect(); + let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); topo(&listen); let mut done = true; for i in 0..(num * 32) { @@ -229,11 +229,11 @@ pub fn cluster_info_retransmit() { solana_logger::setup(); let exit = Arc::new(AtomicBool::new(false)); trace!("c1:"); - let (c1, dr1, tn1) = test_node(&exit); + let (c1, dr1, tn1) = test_node(exit.clone()); trace!("c2:"); - let (c2, dr2, tn2) = test_node(&exit); + let (c2, dr2, tn2) = test_node(exit.clone()); trace!("c3:"); - let (c3, dr3, tn3) = test_node(&exit); + let (c3, dr3, tn3) = test_node(exit.clone()); let c1_contact_info = c1.my_contact_info(); c2.insert_info(c1_contact_info.clone()); @@ -315,7 +315,11 @@ pub fn cluster_info_scale() { let nodes: Vec<_> = vote_keypairs .into_iter() .map(|keypairs| { - test_node_with_bank(Arc::new(keypairs.node_keypair), &exit, bank_forks.clone()) + test_node_with_bank( + Arc::new(keypairs.node_keypair), + exit.clone(), + bank_forks.clone(), + ) }) .collect(); let ci0 = nodes[0].0.my_contact_info(); diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index db63057b5453af..c93fb601e27ea1 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -304,7 +304,7 @@ pub fn load_and_process_ledger( transaction_notifier, blockstore.clone(), false, - &exit, + exit.clone(), ); ( Some(TransactionStatusSender { diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs index b07f3ce1a27dd1..5e108c94e80578 100644 --- a/ledger/src/entry_notifier_service.rs +++ b/ledger/src/entry_notifier_service.rs @@ -28,8 +28,7 @@ pub struct EntryNotifierService { } impl EntryNotifierService { - pub fn new(entry_notifier: EntryNotifierLock, exit: &Arc) -> Self { - let exit = exit.clone(); + pub fn new(entry_notifier: EntryNotifierLock, exit: Arc) -> Self { let (entry_notification_sender, entry_notification_receiver) = unbounded(); let thread_hdl = Builder::new() .name("solEntryNotif".to_string()) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 7ed84435034e25..6e2f3751034788 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -497,7 +497,7 @@ pub fn start_gossip_voter( // node later. node_keypair.insecure_clone(), Some(gossip_addr), - &exit, + exit.clone(), None, 0, false, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 342d3c8f2cea1d..a78d45fd5b90a3 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -1066,7 +1066,7 @@ pub fn create_test_recorder( let poh_service = PohService::new( poh_recorder.clone(), &poh_config, - &exit, + exit.clone(), bank.ticks_per_slot(), crate::poh_service::DEFAULT_PINNED_CPU_CORE, crate::poh_service::DEFAULT_HASHES_PER_BATCH, diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index 4fcc918e5bb401..caa2c2a7c8770a 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -97,13 +97,12 @@ impl PohService { pub fn new( poh_recorder: Arc>, poh_config: &PohConfig, - poh_exit: &Arc, + poh_exit: Arc, ticks_per_slot: u64, pinned_cpu_core: usize, hashes_per_batch: u64, record_receiver: Receiver, ) -> Self { - let poh_exit_ = poh_exit.clone(); let poh_config = poh_config.clone(); let tick_producer = Builder::new() .name("solPohTickProd".to_string()) @@ -113,14 +112,14 @@ impl PohService { Self::low_power_tick_producer( poh_recorder, &poh_config, - &poh_exit_, + &poh_exit, record_receiver, ); } else { Self::short_lived_low_power_tick_producer( poh_recorder, &poh_config, - &poh_exit_, + &poh_exit, record_receiver, ); } @@ -133,7 +132,7 @@ impl PohService { } Self::tick_producer( poh_recorder, - &poh_exit_, + &poh_exit, ticks_per_slot, hashes_per_batch, record_receiver, @@ -143,7 +142,7 @@ impl PohService { ), ); } - poh_exit_.store(true, Ordering::Relaxed); + poh_exit.store(true, Ordering::Relaxed); }) .unwrap(); @@ -493,7 +492,7 @@ mod tests { let poh_service = PohService::new( poh_recorder.clone(), &poh_config, - &exit, + exit.clone(), 0, DEFAULT_PINNED_CPU_CORE, hashes_per_batch, diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 493b0395621696..dacfcd9b32ebb9 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -87,13 +87,12 @@ pub struct OptimisticallyConfirmedBankTracker { impl OptimisticallyConfirmedBankTracker { pub fn new( receiver: BankNotificationReceiver, - exit: &Arc, + exit: Arc, bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, slot_notification_subscribers: Option>>>, ) -> Self { - let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); let mut last_notified_confirmed_slot: Slot = 0; let mut highest_confirmed_slot: Slot = 0; @@ -101,7 +100,7 @@ impl OptimisticallyConfirmedBankTracker { let thread_hdl = Builder::new() .name("solOpConfBnkTrk".to_string()) .spawn(move || loop { - if exit_.load(Ordering::Relaxed) { + if exit.load(Ordering::Relaxed) { break; } @@ -433,7 +432,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 70db285563a096..b08a6ed7b83d1b 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -382,12 +382,12 @@ impl JsonRpcRequestProcessor { CommitmentSlots::new_from_slot(bank.slot()), ))), blockstore, - validator_exit: create_validator_exit(&exit), + validator_exit: create_validator_exit(exit.clone()), health: Arc::new(RpcHealth::new( cluster_info.clone(), None, 0, - exit.clone(), + exit, Arc::clone(bank.get_startup_verification_complete()), )), cluster_info, @@ -4526,10 +4526,9 @@ fn sanitize_transaction( .map_err(|err| Error::invalid_params(format!("invalid transaction: {err}"))) } -pub fn create_validator_exit(exit: &Arc) -> Arc> { +pub fn create_validator_exit(exit: Arc) -> Arc> { let mut validator_exit = Exit::default(); - let exit_ = exit.clone(); - validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + validator_exit.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed))); Arc::new(RwLock::new(validator_exit)) } @@ -4596,7 +4595,7 @@ pub fn populate_blockstore_for_tests( None, blockstore, false, - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), ); // Check that process_entries successfully writes can_commit transactions statuses, and @@ -4780,7 +4779,7 @@ pub mod tests { let leader_pubkey = *bank.collector_id(); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit); let cluster_info = Arc::new(new_test_cluster_info()); let identity = cluster_info.id(); cluster_info.insert_info(ContactInfo::new_with_socketaddr( @@ -6401,7 +6400,7 @@ pub mod tests { #[test] fn test_rpc_send_transaction_preflight() { let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit.clone()); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); @@ -6672,7 +6671,7 @@ pub mod tests { #[test] fn test_rpc_processor_get_block_commitment() { let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit.clone()); let bank_forks = new_bank_forks().0; let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); @@ -8287,7 +8286,7 @@ pub mod tests { #[test] fn test_rpc_single_gossip() { let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit.clone()); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); @@ -8312,7 +8311,7 @@ pub mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot.clone(), max_complete_rewards_slot.clone(), bank_forks.clone(), diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index 7c40664fddc2ce..59911a248b44fd 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -688,7 +688,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -878,7 +878,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -1006,7 +1006,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -1145,7 +1145,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -1201,7 +1201,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -1356,7 +1356,7 @@ mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 0d24449e5b3bff..2dd7e20b86bffb 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -492,7 +492,7 @@ mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 01fd39ad962d85..4fdde57c31a9fe 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -615,7 +615,7 @@ mod tests { .. } = create_genesis_config(10_000); let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit.clone()); let bank = Bank::new_for_tests(&genesis_config); let cluster_info = Arc::new(new_test_cluster_info()); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 2772647111abb1..3c8f8427a8ddaf 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -527,7 +527,7 @@ impl Drop for RpcSubscriptions { impl RpcSubscriptions { pub fn new( - exit: &Arc, + exit: Arc, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, blockstore: Arc, @@ -549,7 +549,7 @@ impl RpcSubscriptions { } pub fn new_for_tests( - exit: &Arc, + exit: Arc, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, bank_forks: Arc>, @@ -572,7 +572,7 @@ impl RpcSubscriptions { } pub fn new_for_tests_with_blockstore( - exit: &Arc, + exit: Arc, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, blockstore: Arc, @@ -608,7 +608,7 @@ impl RpcSubscriptions { } pub fn new_with_config( - exit: &Arc, + exit: Arc, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, blockstore: Arc, @@ -620,7 +620,6 @@ impl RpcSubscriptions { ) -> Self { let (notification_sender, notification_receiver) = crossbeam_channel::unbounded(); - let exit_clone = exit.clone(); let subscriptions = SubscriptionsTracker::new(bank_forks.clone()); let (broadcast_sender, _) = broadcast::channel(config.queue_capacity_items); @@ -636,6 +635,7 @@ impl RpcSubscriptions { let t_cleanup = if notification_threads == 0 { None } else { + let exit = exit.clone(); Some( Builder::new() .name("solRpcNotifier".to_string()) @@ -650,7 +650,7 @@ impl RpcSubscriptions { rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); } Self::process_notifications( - exit_clone, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore, @@ -680,7 +680,7 @@ impl RpcSubscriptions { Some(notification_sender) }, t_cleanup, - exit: exit.clone(), + exit, control, } } @@ -697,7 +697,7 @@ impl RpcSubscriptions { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); Self::new( - &Arc::new(AtomicBool::new(false)), + Arc::new(AtomicBool::new(false)), max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore, @@ -1333,7 +1333,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -1478,7 +1478,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore.clone(), @@ -1598,7 +1598,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore.clone(), @@ -1716,7 +1716,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, blockstore.clone(), @@ -1849,7 +1849,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -1999,7 +1999,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -2175,7 +2175,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -2290,7 +2290,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -2483,7 +2483,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -2659,7 +2659,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -2706,7 +2706,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks, @@ -2767,7 +2767,7 @@ pub(crate) mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), @@ -2970,7 +2970,7 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( - &exit, + exit, max_complete_transaction_status_slot, max_complete_rewards_slot, bank_forks.clone(), diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index e47673123ec86b..d210365789c731 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -32,9 +32,8 @@ impl TransactionStatusService { transaction_notifier: Option, blockstore: Arc, enable_extended_tx_metadata_storage: bool, - exit: &Arc, + exit: Arc, ) -> Self { - let exit = exit.clone(); let thread_hdl = Builder::new() .name("solTxStatusWrtr".to_string()) .spawn(move || loop { @@ -442,7 +441,7 @@ pub(crate) mod tests { Some(test_notifier.clone()), blockstore, false, - &exit, + exit.clone(), ); transaction_status_sender diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 68e98d3bfb42c8..db5ae2b35f7fa1 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -58,7 +58,7 @@ impl BgThreads { in_mem: &[Arc>], threads: usize, can_advance_age: bool, - exit: &Arc, + exit: Arc, ) -> Self { // stop signal used for THIS batch of bg threads let local_exit = Arc::new(AtomicBool::default()); @@ -68,8 +68,8 @@ impl BgThreads { // the first thread we start is special let can_advance_age = can_advance_age && idx == 0; let storage_ = Arc::clone(storage); - let local_exit_ = Arc::clone(&local_exit); - let system_exit_ = Arc::clone(exit); + let local_exit = local_exit.clone(); + let system_exit = exit.clone(); let in_mem_ = in_mem.to_vec(); // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock @@ -77,7 +77,7 @@ impl BgThreads { .name(format!("solIdxFlusher{idx:02}")) .spawn(move || { storage_.background( - vec![local_exit_, system_exit_], + vec![local_exit, system_exit], in_mem_, can_advance_age, ); @@ -123,7 +123,7 @@ impl + Into> AccountsIndexStorage< &self.in_mem, Self::num_threads(), false, // cannot advance age from any of these threads - &self.exit, + self.exit.clone(), )); } self.storage.set_startup(value); @@ -167,7 +167,7 @@ impl + Into> AccountsIndexStorage< .collect::>(); Self { - _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, &exit), + _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()), storage, in_mem, startup_worker_threads: Mutex::default(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 5bf233572c1682..35b0099f2b3bb8 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -103,7 +103,7 @@ pub type Result = std::result::Result; fn recv_loop( socket: &UdpSocket, - exit: Arc, + exit: &AtomicBool, packet_batch_sender: &PacketBatchSender, recycler: &PacketBatchRecycler, stats: &StreamerReceiveStats, @@ -173,7 +173,7 @@ pub fn receiver( .spawn(move || { let _ = recv_loop( &socket, - exit, + &exit, &packet_batch_sender, &recycler, &stats, diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 9c80a0dab67858..82e36ea6d8d703 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -920,7 +920,7 @@ mod tests { SocketAddrSpace::Unspecified, )); let exit = Arc::new(AtomicBool::new(false)); - let validator_exit = create_validator_exit(&exit); + let validator_exit = create_validator_exit(exit); let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig { secondary_indexes: config.account_indexes, }); diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index c2777189db6e3f..5c7b21ffc6a7df 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -171,7 +171,7 @@ fn start_gossip_node( gossip_validators, should_check_duplicate_instance, None, - &gossip_exit_flag, + gossip_exit_flag.clone(), ); (cluster_info, gossip_exit_flag, gossip_service) } diff --git a/validator/src/dashboard.rs b/validator/src/dashboard.rs index a904172cda6dc7..d93798e998ef54 100644 --- a/validator/src/dashboard.rs +++ b/validator/src/dashboard.rs @@ -195,7 +195,7 @@ impl Dashboard { async fn wait_for_validator_startup( ledger_path: &Path, - exit: &Arc, + exit: &AtomicBool, progress_bar: ProgressBar, refresh_interval: Duration, ) -> Option<(SocketAddr, SystemTime)> {