Skip to content

Commit

Permalink
RPC Notifier Signal when Setup Complete (solana-labs#27481)
Browse files Browse the repository at this point in the history
* RPC notifier signal when ready
  • Loading branch information
bw-solana authored Sep 1, 2022
1 parent 9b8bed8 commit 242c9cb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ impl Validator {
block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
&config.pubsub_config,
None,
));

let max_slots = Arc::new(MaxSlots::default());
Expand Down
27 changes: 23 additions & 4 deletions rpc/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ impl RpcSubscriptions {
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default(),
None,
)
}

Expand All @@ -573,14 +574,13 @@ impl RpcSubscriptions {
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);

Self::new_with_config(
Self::new_for_tests_with_blockstore(
exit,
max_complete_transaction_status_slot,
blockstore,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default_for_tests(),
)
}

Expand All @@ -592,15 +592,30 @@ impl RpcSubscriptions {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> Self {
Self::new_with_config(
let rpc_notifier_ready = Arc::new(AtomicBool::new(false));

let rpc_subscriptions = Self::new_with_config(
exit,
max_complete_transaction_status_slot,
blockstore,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default_for_tests(),
)
Some(rpc_notifier_ready.clone()),
);

// Ensure RPC notifier is ready to receive notifications before proceeding
let start_time = Instant::now();
loop {
if rpc_notifier_ready.load(Ordering::Relaxed) {
break;
} else if (Instant::now() - start_time).as_millis() > 5000 {
panic!("RPC notifier thread setup took too long");
}
}

rpc_subscriptions
}

pub fn new_with_config(
Expand All @@ -611,6 +626,7 @@ impl RpcSubscriptions {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
config: &PubSubConfig,
rpc_notifier_ready: Option<Arc<AtomicBool>>,
) -> Self {
let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();

Expand Down Expand Up @@ -640,6 +656,9 @@ impl RpcSubscriptions {
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {
rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
}
Self::process_notifications(
exit_clone,
max_complete_transaction_status_slot,
Expand Down

0 comments on commit 242c9cb

Please sign in to comment.