Skip to content

Commit

Permalink
[network] Add ability to drop network messages in framework
Browse files Browse the repository at this point in the history
THe `get_next_network_msg` function doesn't work for RPC, as it will
hang onto the oneshot sender.  This drops it so that the tasks will
continue.
  • Loading branch information
gregnazario authored and bors-libra committed Nov 13, 2021
1 parent a0d5d7d commit 52a1a1e
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions network/src/testutils/test_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use crate::{
application::storage::PeerMetadataStorage,
peer_manager::{ConnectionNotification, PeerManagerNotification, PeerManagerRequest},
protocols::{direct_send::Message, rpc::InboundRpcRequest},
protocols::{
direct_send::Message,
rpc::{InboundRpcRequest, OutboundRpcRequest},
},
transport::ConnectionMetadata,
DisconnectReason, ProtocolId,
};
Expand Down Expand Up @@ -297,15 +300,42 @@ pub trait TestNode: ApplicationNode + Sync {
)
}

/// Gets the next queued network message on `Node`'s network (`NetworkId`). Doesn't propagate
/// to downstream node
/// Gets the next queued network message on `Node`'s network [`NetworkId`]. Doesn't propagate
/// to downstream node. If dropping a message use [`TestNode::drop_next_network_msg`]
async fn get_next_network_msg(&mut self, network_id: NetworkId) -> PeerManagerRequest {
self.get_outbound_handle(network_id)
.next()
.await
.expect("Expecting a message")
}

/// Drop a network message. This is required over [`TestNode::get_network_msg`] because the
/// oneshot channel must be dropped.
async fn drop_next_network_msg(
&mut self,
network_id: NetworkId,
) -> (PeerId, ProtocolId, bytes::Bytes) {
let message = self.get_next_network_msg(network_id).await;
match message {
PeerManagerRequest::SendRpc(
peer_id,
OutboundRpcRequest {
protocol_id,
res_tx,
data,
..
},
) => {
// Forcefully close the oneshot channel, otherwise listening task will hang forever.
drop(res_tx);
(peer_id, protocol_id, data)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
(peer_id, message.protocol_id, message.mdata)
}
}
}

/// Sends the next queued network message on `Node`'s network (`NetworkId`)
async fn send_next_network_msg(&mut self, network_id: NetworkId) {
let request = self.get_next_network_msg(network_id).await;
Expand Down

0 comments on commit 52a1a1e

Please sign in to comment.