From 52a1a1eba74257796e19b3c1ab84ec06bff96e75 Mon Sep 17 00:00:00 2001 From: Greg Nazario Date: Mon, 8 Nov 2021 17:05:39 -0800 Subject: [PATCH] [network] Add ability to drop network messages in framework THe `get_next_network_msg` function doesn't work for RPC, as it will hang onto the oneshot sender. This drops it so that the tasks will continue. --- network/src/testutils/test_node.rs | 36 +++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/network/src/testutils/test_node.rs b/network/src/testutils/test_node.rs index d846bcf7a2..fb2a48eb09 100644 --- a/network/src/testutils/test_node.rs +++ b/network/src/testutils/test_node.rs @@ -4,7 +4,10 @@ use crate::{ application::storage::PeerMetadataStorage, peer_manager::{ConnectionNotification, PeerManagerNotification, PeerManagerRequest}, - protocols::{direct_send::Message, rpc::InboundRpcRequest}, + protocols::{ + direct_send::Message, + rpc::{InboundRpcRequest, OutboundRpcRequest}, + }, transport::ConnectionMetadata, DisconnectReason, ProtocolId, }; @@ -297,8 +300,8 @@ pub trait TestNode: ApplicationNode + Sync { ) } - /// Gets the next queued network message on `Node`'s network (`NetworkId`). Doesn't propagate - /// to downstream node + /// Gets the next queued network message on `Node`'s network [`NetworkId`]. Doesn't propagate + /// to downstream node. If dropping a message use [`TestNode::drop_next_network_msg`] async fn get_next_network_msg(&mut self, network_id: NetworkId) -> PeerManagerRequest { self.get_outbound_handle(network_id) .next() @@ -306,6 +309,33 @@ pub trait TestNode: ApplicationNode + Sync { .expect("Expecting a message") } + /// Drop a network message. This is required over [`TestNode::get_network_msg`] because the + /// oneshot channel must be dropped. + async fn drop_next_network_msg( + &mut self, + network_id: NetworkId, + ) -> (PeerId, ProtocolId, bytes::Bytes) { + let message = self.get_next_network_msg(network_id).await; + match message { + PeerManagerRequest::SendRpc( + peer_id, + OutboundRpcRequest { + protocol_id, + res_tx, + data, + .. + }, + ) => { + // Forcefully close the oneshot channel, otherwise listening task will hang forever. + drop(res_tx); + (peer_id, protocol_id, data) + } + PeerManagerRequest::SendDirectSend(peer_id, message) => { + (peer_id, message.protocol_id, message.mdata) + } + } + } + /// Sends the next queued network message on `Node`'s network (`NetworkId`) async fn send_next_network_msg(&mut self, network_id: NetworkId) { let request = self.get_next_network_msg(network_id).await;