Skip to content

Commit

Permalink
[full node] Fix memory leak at full node follower (MystenLabs#3874)
Browse files Browse the repository at this point in the history
* Fix mem leak at full node follower

* Clean up pending-downloads

* Notify waiters even in the case of an error

* Add comment

* Simplify the API and remove memory leak

Co-authored-by: George Danezis <[email protected]>
Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2022
1 parent c8dd20e commit 548564d
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions crates/sui-core/src/node_sync/node_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl EffectsStakeMap {

pub fn forget_effects(&mut self, digests: &TransactionEffectsDigest) {
self.effects_stake_map.remove(digests);
self.effects_vote_map.remove(digests);
}
}

Expand All @@ -131,24 +132,18 @@ where
}
}

/// Returns (Some(tx), rx) if there are no other waiters yet, or else (None, rx).
/// All rxes can be woken by sending to the supplied tx, or by calling notify(key, result)
async fn wait(
&self,
key: &Key,
) -> (
Option<broadcast::Sender<ResultT>>,
broadcast::Receiver<ResultT>,
) {
/// Returns (true, rx) if there are no other waiters yet, or else (false, rx).
/// All rxes can be woken by calling notify(key, result)
async fn wait(&self, key: &Key) -> (bool, broadcast::Receiver<ResultT>) {
let waiters = &mut self.waiters.lock().unwrap();
let entry = waiters.entry(key.clone());

match entry {
hash_map::Entry::Occupied(e) => (None, e.get().subscribe()),
hash_map::Entry::Occupied(e) => (false, e.get().subscribe()),
hash_map::Entry::Vacant(e) => {
let (tx, rx) = broadcast::channel(1);
e.insert(tx.clone());
(Some(tx), rx)
e.insert(tx);
(true, rx)
}
}
}
Expand Down Expand Up @@ -261,7 +256,7 @@ pub struct NodeSyncState<A> {
aggregator: Arc<AuthorityAggregator<A>>,

// Used to single-shot multiple concurrent downloads.
pending_downloads: Waiter<TransactionDigest, SuiResult>,
pending_downloads: Arc<Waiter<TransactionDigest, SuiResult>>,

// Used to wait for parent transactions to be applied locally
pending_txes: Waiter<TransactionDigest, ()>,
Expand All @@ -285,7 +280,7 @@ impl<A> NodeSyncState<A> {
state,
aggregator,
node_sync_store,
pending_downloads: Waiter::new(),
pending_downloads: Arc::new(Waiter::new()),
pending_txes: Waiter::new(),
sender,
receiver: Arc::new(tokio::sync::Mutex::new(receiver)),
Expand Down Expand Up @@ -344,17 +339,18 @@ where
let res = state.process_digest(sync_arg, permit).await;
if let Err(error) = &res {
error!(?sync_arg, "process_digest failed: {}", error);
} else {
let digest = sync_arg.transaction_digest();
trace!(?digest, "notifying waiters");
state
.pending_txes
.notify(digest, ())
.await
.tap_err(|e| debug!(?digest, "{}", e))
.ok();
}

// Notify waiters even if tx failed, to avoid leaking resources.
let digest = sync_arg.transaction_digest();
trace!(?digest, "notifying waiters");
state
.pending_txes
.notify(digest, ())
.await
.tap_err(|e| debug!(?digest, "{}", e))
.ok();

if let Some(tx) = tx {
// Send status back to follower so that it knows whether to advance
// the watermark.
Expand Down Expand Up @@ -563,24 +559,31 @@ where
authorities_with_cert: Option<BTreeSet<AuthorityName>>,
req: &DownloadRequest,
) -> SuiResult<(CertifiedTransaction, SignedTransactionEffects)> {
let tx_digest = req.transaction_digest();
if let Some(c) = self.node_sync_store.get_cert_and_effects(tx_digest)? {
let tx_digest = *req.transaction_digest();
if let Some(c) = self.node_sync_store.get_cert_and_effects(&tx_digest)? {
return Ok(c);
}

let (tx, mut rx) = self.pending_downloads.wait(tx_digest).await;
let pending_downloads = self.pending_downloads.clone();
let (first, mut rx) = pending_downloads.wait(&tx_digest).await;
// Only start the download if there are no other concurrent downloads.
if let Some(tx) = tx {
if first {
let aggregator = self.aggregator.clone();
let node_sync_store = self.node_sync_store.clone();
let req = req.clone();
tokio::task::spawn(async move {
if let Err(error) = tx.send(
Self::download_impl(authorities_with_cert, aggregator, &req, node_sync_store)
let _ = pending_downloads
.notify(
&tx_digest,
Self::download_impl(
authorities_with_cert,
aggregator,
&req,
node_sync_store,
)
.await,
) {
error!(?req, ?error, "Could not broadcast cert response");
}
)
.await;
});
}

Expand All @@ -591,7 +594,7 @@ where
})??;

self.node_sync_store
.get_cert_and_effects(tx_digest)?
.get_cert_and_effects(&tx_digest)?
.ok_or_else(|| SuiError::GenericAuthorityError {
error: format!(
"cert/effects for {:?} should have been in the node_sync_store",
Expand Down

0 comments on commit 548564d

Please sign in to comment.