Skip to content

Commit

Permalink
Add timeout support to WaitingPeer (MystenLabs#5592)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Oct 27, 2022
1 parent 56de844 commit 1541b38
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 28 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ members = [
]

[workspace.package]
# This version string will be inheritted by sui-core, sui-faucet, sui-node, sui-tools, sui-sdk, and sui crates
# This version string will be inherited by sui-core, sui-faucet, sui-node, sui-tools, sui-sdk, and sui crates
version = "0.14.0"

[profile.release]
Expand Down Expand Up @@ -101,9 +101,9 @@ move-prover-boogie-backend = { git = "https://github.com/move-language/move", re
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "7766b1ef23f6bed6da9ff9e50dc2a0d1957d344d" }

# anemo dependencies
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e" }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e" }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e" }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909" }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909" }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909" }

# Use the same workspace-hack across crates.
workspace-hack = { path = "crates/workspace-hack" }
Expand Down
10 changes: 5 additions & 5 deletions crates/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ aes = { version = "0.8", default-features = false }
aes-gcm = { version = "0.10", features = ["aes", "alloc", "getrandom"] }
ahash = { version = "0.7", features = ["std"] }
aho-corasick = { version = "0.7", features = ["std"] }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909", default-features = false }
ansi_term = { version = "0.12", default-features = false }
anyhow = { version = "1", features = ["backtrace", "std"] }
arc-swap = { version = "1", default-features = false, features = ["serde"] }
Expand Down Expand Up @@ -638,9 +638,9 @@ aes = { version = "0.8", default-features = false }
aes-gcm = { version = "0.10", features = ["aes", "alloc", "getrandom"] }
ahash = { version = "0.7", features = ["std"] }
aho-corasick = { version = "0.7", features = ["std"] }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e", default-features = false }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "7da7c9a1913ed7fadbdd92ebc1b9f48e0c8cef0e", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909", default-features = false }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "f514987fa7f731058dd5a56e409e68600d84d909", default-features = false }
ansi_term = { version = "0.12", default-features = false }
anyhow = { version = "1", features = ["backtrace", "std"] }
arc-swap = { version = "1", default-features = false, features = ["serde"] }
Expand Down
48 changes: 32 additions & 16 deletions narwhal/network/src/anemo_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use anemo::PeerId;
use anemo::Request;
use anemo::Response;
use bytes::Bytes;
use futures::future::OptionFuture;
use futures::FutureExt;
use std::time::Instant;

pub trait NetworkExt {
fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
Expand All @@ -33,9 +35,10 @@ impl WaitingPeer {
Self { peer_id, network }
}

async fn do_rpc(self, request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
async fn do_rpc(self, mut request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
use tokio::sync::broadcast::error::RecvError;

let start = Instant::now();
let (mut subscriber, _) = self.network.subscribe();

// If we're connected with the peer immediately make the request
Expand All @@ -44,29 +47,42 @@ impl WaitingPeer {
}

// If we're not connected we'll need to check to see if the Peer is a KnownPeer
let timeout = request.timeout();
let sleep: OptionFuture<_> = timeout.map(tokio::time::sleep).into();
tokio::pin!(sleep);
loop {
if self.network.known_peers().get(&self.peer_id).is_none() {
return Err(format!("peer {} is not a known peer", self.peer_id).into());
}

match subscriber.recv().await {
Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
// We're now connected with the peer, lets try to make a network request
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
tokio::select! {
recv = subscriber.recv() => match recv {
Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
// We're now connected with the peer, lets try to make a network request
if let Some(mut peer) = self.network.peer(self.peer_id) {
if let Some(duration) = timeout {
// Reduce timeout to account for time already spent waiting
// for the peer.
request.set_timeout(duration.saturating_sub(Instant::now().duration_since(start)));
}
return peer.rpc(request).await.map_err(Into::into);
}
}
}
Err(RecvError::Closed) => return Err("network is closed".into()),
Err(RecvError::Lagged(_)) => {
subscriber = subscriber.resubscribe();
Err(RecvError::Closed) => return Err("network is closed".into()),
Err(RecvError::Lagged(_)) => {
subscriber = subscriber.resubscribe();

// We lagged behind so we may have missed the connection event
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
// We lagged behind so we may have missed the connection event
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
}
}
}
// Just do another iteration
_ => {}
// Just do another iteration
_ => {}
},
Some(_) = &mut sleep => {
return Err(format!("timed out waiting for peer {}", self.peer_id).into());
},
}
}
}
Expand Down

0 comments on commit 1541b38

Please sign in to comment.