diff --git a/network/src/blocks/blocks.rs b/network/src/blocks/blocks.rs index ee8b07540a..3fa1190b50 100644 --- a/network/src/blocks/blocks.rs +++ b/network/src/blocks/blocks.rs @@ -56,8 +56,7 @@ impl Blocks { if let (Some(sync_node), Ok(block_locator_hashes)) = (sync_node, block_locator_hashes) { // Send a GetSync to the selected sync node. self.outbound - .send_request(&Request::GetSync(sync_node, GetSync::new(block_locator_hashes))) - .await; + .send_request(&Request::GetSync(sync_node, GetSync::new(block_locator_hashes))); } else { // If no sync node is available, wait until peers have been established. info!("No sync node is registered, blocks could not be synced"); @@ -89,8 +88,7 @@ impl Blocks { if *remote_address != block_miner && *remote_address != local_address { // Send a `Block` message to the connected peer. self.outbound - .send_request(&Request::Block(*remote_address, Block::new(block_bytes.clone()))) - .await; + .send_request(&Request::Block(*remote_address, Block::new(block_bytes.clone()))); } } @@ -111,12 +109,10 @@ impl Blocks { for remote_address in connected_peers.keys() { if *remote_address != transaction_sender && *remote_address != local_address { // Send a `Transaction` message to the connected peer. - self.outbound - .send_request(&Request::Transaction( - *remote_address, - Transaction::new(transaction_bytes.clone()), - )) - .await; + self.outbound.send_request(&Request::Transaction( + *remote_address, + Transaction::new(transaction_bytes.clone()), + )); } } @@ -231,8 +227,7 @@ impl Blocks { if let Ok(block) = block { // Send a `SyncBlock` message to the connected peer. self.outbound - .send_request(&Request::SyncBlock(remote_address, SyncBlock::new(block.serialize()?))) - .await; + .send_request(&Request::SyncBlock(remote_address, SyncBlock::new(block.serialize()?))); } Ok(()) } @@ -256,8 +251,7 @@ impl Blocks { if !transactions.is_empty() { // Send a `MemoryPool` message to the connected peer. self.outbound - .send_request(&Request::MemoryPool(remote_address, MemoryPool::new(transactions))) - .await; + .send_request(&Request::MemoryPool(remote_address, MemoryPool::new(transactions))); } Ok(()) @@ -324,7 +318,7 @@ impl Blocks { }; // send a `Sync` message to the connected peer. - self.outbound.send_request(&Request::Sync(remote_address, sync)).await; + self.outbound.send_request(&Request::Sync(remote_address, sync)); Ok(()) } @@ -339,8 +333,7 @@ impl Blocks { // detect missing blocks and divergence in chain for now. for hash in block_hashes { self.outbound - .send_request(&Request::GetBlock(remote_address, GetBlock::new(hash))) - .await; + .send_request(&Request::GetBlock(remote_address, GetBlock::new(hash))); } } diff --git a/network/src/outbound/outbound.rs b/network/src/outbound/outbound.rs index 995aba7971..fd949ce40f 100644 --- a/network/src/outbound/outbound.rs +++ b/network/src/outbound/outbound.rs @@ -27,7 +27,6 @@ use std::{ }; use parking_lot::RwLock; -use tokio::{task, task::JoinHandle}; /// The map of remote addresses to their active write channels. type Channels = HashMap; @@ -125,16 +124,19 @@ impl Outbound { /// and attempts to send the given request to them. /// #[inline] - pub async fn send_request(&self, request: &Request) -> JoinHandle<()> { + pub fn send_request(&self, request: &Request) { let outbound = self.clone(); let request = request.clone(); + // issues related to spawning this task are unlikely and not interesting; + // it's the failures with `Outbound::send` that are important, and the're + // handled within that method tokio::spawn(async move { // Wait for authorization. outbound.authorize(&request).await; // Send the request. outbound.send(&request).await; - }) + }); } /// @@ -246,8 +248,8 @@ mod tests { use crate::{external::GetPeers, outbound::*, Channel}; use snarkos_testing::network::TcpServer; - use std::net::SocketAddr; - use tokio::net::TcpStream; + use std::{net::SocketAddr, time::Duration}; + use tokio::{net::TcpStream, time::sleep}; /// /// Returns a `Request` for testing. @@ -325,7 +327,8 @@ mod tests { assert!(!outbound.is_failure(&request)); // Send the request to the server. - outbound.send_request(&request).await.await.unwrap(); + outbound.send_request(&request); + sleep(Duration::from_millis(10)).await; // Check that the request succeeded. assert!(!outbound.is_pending(&request)); @@ -350,7 +353,8 @@ mod tests { assert!(!outbound.is_failure(&request)); // Send the request to the server. - outbound.send_request(&request).await.await.unwrap(); + outbound.send_request(&request); + sleep(Duration::from_millis(10)).await; // Check that the request succeeded. assert!(!outbound.is_pending(&request)); diff --git a/network/src/peers/peers.rs b/network/src/peers/peers.rs index be09317d76..88a3a582b8 100644 --- a/network/src/peers/peers.rs +++ b/network/src/peers/peers.rs @@ -354,15 +354,13 @@ impl Peers { // TODO (raychu86): Establish a formal node version. // Broadcast a `Version` message to the connected peer. - self.outbound - .send_request(&Request::Version(Version::new( - 1u64, - block_height, - nonce, - local_address, - remote_address, - ))) - .await; + self.outbound.send_request(&Request::Version(Version::new( + 1u64, + block_height, + nonce, + local_address, + remote_address, + ))); } else { // Case 2 - The remote address is not of a connected peer, proceed to disconnect. @@ -381,9 +379,7 @@ impl Peers { trace!("Sending GetPeers requests to connected peers"); for (remote_address, _) in self.connected_peers() { - self.outbound - .send_request(&Request::GetPeers(remote_address, GetPeers)) - .await; + self.outbound.send_request(&Request::GetPeers(remote_address, GetPeers)); // // Fetch the connection channel. // if let Some(channel) = self.get_channel(&remote_address) { @@ -463,13 +459,11 @@ impl Peers { ) -> Result<(), NetworkError> { // FIXME(ljedrz): it appears that Verack is not sent back in a 1:1 fashion if self.number_of_connected_peers() < self.environment.maximum_number_of_connected_peers() { - self.outbound - .send_request(&Request::Verack(Verack::new( - remote_version.nonce, - remote_version.receiver, /* local_address */ - remote_address, - ))) - .await; + self.outbound.send_request(&Request::Verack(Verack::new( + remote_version.nonce, + remote_version.receiver, /* local_address */ + remote_address, + ))); if !self.connected_peers().contains_key(&remote_address) { self.connecting_to_peer(remote_address, remote_version.nonce).await?; @@ -502,8 +496,7 @@ impl Peers { peers.push((peer_address, *peer_info.last_seen())); } self.outbound - .send_request(&Request::Peers(remote_address, PeersMessage::new(peers))) - .await; + .send_request(&Request::Peers(remote_address, PeersMessage::new(peers))); Ok(()) }