Skip to content

Commit

Permalink
narwhal-network: add WaitingPeer (MystenLabs#5277)
Browse files Browse the repository at this point in the history
Introduce the `NetworkExt` trait for adding extention methods to an
`anemo::Network` as well as a `WaitingPeer` type which enables queuing
up an rpc to a known but currently disconnected peer.
  • Loading branch information
bmwill authored Oct 17, 2022
1 parent 1cf887a commit 94d125a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions narwhal/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "57c4af7d
anyhow = "1.0.65"
axum = "0.5.16"
axum-server = "0.4.2"
tower = "0.4.13"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
93 changes: 93 additions & 0 deletions narwhal/network/src/anemo_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anemo::codegen::BoxError;
use anemo::codegen::BoxFuture;
use anemo::codegen::Service;
use anemo::types::PeerEvent;
use anemo::Network;
use anemo::PeerId;
use anemo::Request;
use anemo::Response;
use bytes::Bytes;
use futures::FutureExt;

pub trait NetworkExt {
fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer;
}

impl NetworkExt for Network {
fn waiting_peer(&self, peer_id: PeerId) -> WaitingPeer {
WaitingPeer::new(self.clone(), peer_id)
}
}

#[derive(Clone)]
pub struct WaitingPeer {
peer_id: PeerId,
network: Network,
}

impl WaitingPeer {
pub fn new(network: Network, peer_id: PeerId) -> Self {
Self { peer_id, network }
}

async fn do_rpc(self, request: Request<Bytes>) -> Result<Response<Bytes>, BoxError> {
use tokio::sync::broadcast::error::RecvError;

let (mut subscriber, _) = self.network.subscribe();

// If we're connected with the peer immediately make the request
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
}

// If we're not connected we'll need to check to see if the Peer is a KnownPeer
loop {
if self.network.known_peers().get(&self.peer_id).is_none() {
return Err(format!("peer {} is not a known peer", self.peer_id).into());
}

match subscriber.recv().await {
Ok(PeerEvent::NewPeer(peer_id)) if peer_id == self.peer_id => {
// We're now connected with the peer, lets try to make a network request
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
}
}
Err(RecvError::Closed) => return Err("network is closed".into()),
Err(RecvError::Lagged(_)) => {
subscriber = subscriber.resubscribe();

// We lagged behind so we may have missed the connection event
if let Some(mut peer) = self.network.peer(self.peer_id) {
return peer.rpc(request).await.map_err(Into::into);
}
}
// Just do another iteration
_ => {}
}
}
}
}

impl Service<Request<Bytes>> for WaitingPeer {
type Response = Response<Bytes>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

#[inline]
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, request: Request<Bytes>) -> Self::Future {
let peer = self.clone();
peer.do_rpc(request).boxed()
}
}
1 change: 1 addition & 0 deletions narwhal/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#![allow(clippy::async_yields_async)]

pub mod admin;
pub mod anemo_ext;
mod bounded_executor;
pub mod connectivity;
pub mod metrics;
Expand Down

0 comments on commit 94d125a

Please sign in to comment.