Skip to content

Commit

Permalink
refactor: make Outbound::send_request non-async
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Jan 11, 2021
1 parent bc149c4 commit f49dace
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 45 deletions.
27 changes: 10 additions & 17 deletions network/src/blocks/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl Blocks {
if let (Some(sync_node), Ok(block_locator_hashes)) = (sync_node, block_locator_hashes) {
// Send a GetSync to the selected sync node.
self.outbound
.send_request(&Request::GetSync(sync_node, GetSync::new(block_locator_hashes)))
.await;
.send_request(&Request::GetSync(sync_node, GetSync::new(block_locator_hashes)));
} else {
// If no sync node is available, wait until peers have been established.
info!("No sync node is registered, blocks could not be synced");
Expand Down Expand Up @@ -89,8 +88,7 @@ impl Blocks {
if *remote_address != block_miner && *remote_address != local_address {
// Send a `Block` message to the connected peer.
self.outbound
.send_request(&Request::Block(*remote_address, Block::new(block_bytes.clone())))
.await;
.send_request(&Request::Block(*remote_address, Block::new(block_bytes.clone())));
}
}

Expand All @@ -111,12 +109,10 @@ impl Blocks {
for remote_address in connected_peers.keys() {
if *remote_address != transaction_sender && *remote_address != local_address {
// Send a `Transaction` message to the connected peer.
self.outbound
.send_request(&Request::Transaction(
*remote_address,
Transaction::new(transaction_bytes.clone()),
))
.await;
self.outbound.send_request(&Request::Transaction(
*remote_address,
Transaction::new(transaction_bytes.clone()),
));
}
}

Expand Down Expand Up @@ -231,8 +227,7 @@ impl Blocks {
if let Ok(block) = block {
// Send a `SyncBlock` message to the connected peer.
self.outbound
.send_request(&Request::SyncBlock(remote_address, SyncBlock::new(block.serialize()?)))
.await;
.send_request(&Request::SyncBlock(remote_address, SyncBlock::new(block.serialize()?)));
}
Ok(())
}
Expand All @@ -256,8 +251,7 @@ impl Blocks {
if !transactions.is_empty() {
// Send a `MemoryPool` message to the connected peer.
self.outbound
.send_request(&Request::MemoryPool(remote_address, MemoryPool::new(transactions)))
.await;
.send_request(&Request::MemoryPool(remote_address, MemoryPool::new(transactions)));
}

Ok(())
Expand Down Expand Up @@ -324,7 +318,7 @@ impl Blocks {
};

// send a `Sync` message to the connected peer.
self.outbound.send_request(&Request::Sync(remote_address, sync)).await;
self.outbound.send_request(&Request::Sync(remote_address, sync));

Ok(())
}
Expand All @@ -339,8 +333,7 @@ impl Blocks {
// detect missing blocks and divergence in chain for now.
for hash in block_hashes {
self.outbound
.send_request(&Request::GetBlock(remote_address, GetBlock::new(hash)))
.await;
.send_request(&Request::GetBlock(remote_address, GetBlock::new(hash)));
}
}

Expand Down
18 changes: 11 additions & 7 deletions network/src/outbound/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::{
};

use parking_lot::RwLock;
use tokio::{task, task::JoinHandle};

/// The map of remote addresses to their active write channels.
type Channels = HashMap<SocketAddr, Channel>;
Expand Down Expand Up @@ -125,16 +124,19 @@ impl Outbound {
/// and attempts to send the given request to them.
///
#[inline]
pub async fn send_request(&self, request: &Request) -> JoinHandle<()> {
pub fn send_request(&self, request: &Request) {
let outbound = self.clone();
let request = request.clone();

// issues related to spawning this task are unlikely and not interesting;
// it's the failures with `Outbound::send` that are important, and the're
// handled within that method
tokio::spawn(async move {
// Wait for authorization.
outbound.authorize(&request).await;
// Send the request.
outbound.send(&request).await;
})
});
}

///
Expand Down Expand Up @@ -246,8 +248,8 @@ mod tests {
use crate::{external::GetPeers, outbound::*, Channel};
use snarkos_testing::network::TcpServer;

use std::net::SocketAddr;
use tokio::net::TcpStream;
use std::{net::SocketAddr, time::Duration};
use tokio::{net::TcpStream, time::sleep};

///
/// Returns a `Request` for testing.
Expand Down Expand Up @@ -325,7 +327,8 @@ mod tests {
assert!(!outbound.is_failure(&request));

// Send the request to the server.
outbound.send_request(&request).await.await.unwrap();
outbound.send_request(&request);
sleep(Duration::from_millis(10)).await;

// Check that the request succeeded.
assert!(!outbound.is_pending(&request));
Expand All @@ -350,7 +353,8 @@ mod tests {
assert!(!outbound.is_failure(&request));

// Send the request to the server.
outbound.send_request(&request).await.await.unwrap();
outbound.send_request(&request);
sleep(Duration::from_millis(10)).await;

// Check that the request succeeded.
assert!(!outbound.is_pending(&request));
Expand Down
35 changes: 14 additions & 21 deletions network/src/peers/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,13 @@ impl Peers {

// TODO (raychu86): Establish a formal node version.
// Broadcast a `Version` message to the connected peer.
self.outbound
.send_request(&Request::Version(Version::new(
1u64,
block_height,
nonce,
local_address,
remote_address,
)))
.await;
self.outbound.send_request(&Request::Version(Version::new(
1u64,
block_height,
nonce,
local_address,
remote_address,
)));
} else {
// Case 2 - The remote address is not of a connected peer, proceed to disconnect.

Expand All @@ -381,9 +379,7 @@ impl Peers {
trace!("Sending GetPeers requests to connected peers");

for (remote_address, _) in self.connected_peers() {
self.outbound
.send_request(&Request::GetPeers(remote_address, GetPeers))
.await;
self.outbound.send_request(&Request::GetPeers(remote_address, GetPeers));

// // Fetch the connection channel.
// if let Some(channel) = self.get_channel(&remote_address) {
Expand Down Expand Up @@ -463,13 +459,11 @@ impl Peers {
) -> Result<(), NetworkError> {
// FIXME(ljedrz): it appears that Verack is not sent back in a 1:1 fashion
if self.number_of_connected_peers() < self.environment.maximum_number_of_connected_peers() {
self.outbound
.send_request(&Request::Verack(Verack::new(
remote_version.nonce,
remote_version.receiver, /* local_address */
remote_address,
)))
.await;
self.outbound.send_request(&Request::Verack(Verack::new(
remote_version.nonce,
remote_version.receiver, /* local_address */
remote_address,
)));

if !self.connected_peers().contains_key(&remote_address) {
self.connecting_to_peer(remote_address, remote_version.nonce).await?;
Expand Down Expand Up @@ -502,8 +496,7 @@ impl Peers {
peers.push((peer_address, *peer_info.last_seen()));
}
self.outbound
.send_request(&Request::Peers(remote_address, PeersMessage::new(peers)))
.await;
.send_request(&Request::Peers(remote_address, PeersMessage::new(peers)));

Ok(())
}
Expand Down

0 comments on commit f49dace

Please sign in to comment.