Skip to content

Commit

Permalink
refactor: don't return a Result in infallible methods
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Jan 15, 2021
1 parent 7cf4333 commit 61b38a7
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 56 deletions.
18 changes: 4 additions & 14 deletions network/src/blocks/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Blocks {
///
/// Broadcasts updates with connected peers and maintains a permitted number of connected peers.
///
pub async fn update(&self, sync_node: Option<SocketAddr>) -> Result<(), NetworkError> {
pub async fn update(&self, sync_node: Option<SocketAddr>) {
// Check that this node is not a bootnode.
if !self.environment.is_bootnode() {
let block_locator_hashes = self.environment.storage().read().get_block_locator_hashes();
Expand All @@ -56,8 +56,6 @@ impl Blocks {
info!("No sync node is registered, blocks could not be synced");
}
}

Ok(())
}

///
Expand All @@ -74,7 +72,7 @@ impl Blocks {
block_bytes: Vec<u8>,
block_miner: SocketAddr,
connected_peers: &HashMap<SocketAddr, PeerInfo>,
) -> Result<(), NetworkError> {
) {
debug!("Propagating a block to peers");

let local_address = self.local_address();
Expand All @@ -87,8 +85,6 @@ impl Blocks {
));
}
}

Ok(())
}

/// A peer has sent us a new block to process.
Expand Down Expand Up @@ -126,7 +122,7 @@ impl Blocks {
// This is a new block, send it to our peers.
if let Some(connected_peers) = connected_peers {
if is_new_block {
self.propagate_block(block, remote_address, &connected_peers).await?;
self.propagate_block(block, remote_address, &connected_peers).await;
}
}
}
Expand Down Expand Up @@ -198,11 +194,7 @@ impl Blocks {
}

/// A peer has sent us their chain state.
pub(crate) async fn received_sync(
&self,
remote_address: SocketAddr,
block_hashes: Vec<BlockHeaderHash>,
) -> Result<(), NetworkError> {
pub(crate) async fn received_sync(&self, remote_address: SocketAddr, block_hashes: Vec<BlockHeaderHash>) {
// If empty sync is no-op as chain states match
if !block_hashes.is_empty() {
// GetBlocks for each block hash: fire and forget, relying on block locator hashes to
Expand All @@ -214,7 +206,5 @@ impl Blocks {
));
}
}

Ok(())
}
}
13 changes: 4 additions & 9 deletions network/src/external/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,15 @@ impl Channel {
Ok(Channel::new(remote_address, stream))
}

/// Returns a new channel with the specified address and new writer stream.
pub async fn update_address(self, remote_address: SocketAddr) -> Result<Self, ConnectError> {
Ok(Self {
remote_address,
writer: self.writer,
})
/// Updates the address associated with the given channel.
pub async fn update_address(&mut self, remote_address: SocketAddr) {
self.remote_address = remote_address;
}

/// Writes a message header + payload.
pub async fn write(&self, payload: &Payload) -> Result<(), ConnectError> {
let serialized_payload = bincode::serialize(payload).map_err(|e| ConnectError::MessageError(e.into()))?;
let header = MessageHeader {
len: serialized_payload.len() as u32,
};
let header = MessageHeader::from(serialized_payload.len());

{
let mut writer = self.writer.lock().await;
Expand Down
2 changes: 1 addition & 1 deletion network/src/external/message/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn read_header<T: AsyncRead + Unpin>(mut stream: &mut T) -> Result<Mes
let header = MessageHeader::from(header_arr);

if header.len as usize > MAX_MESSAGE_SIZE {
return Err(MessageHeaderError::TooBig(header.len as usize, MAX_MESSAGE_SIZE));
Err(MessageHeaderError::TooBig(header.len as usize, MAX_MESSAGE_SIZE))
} else {
Ok(header)
}
Expand Down
4 changes: 2 additions & 2 deletions network/src/inbound/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl Inbound {
stream: TcpStream,
) -> Result<(Channel, OwnedReadHalf), NetworkError> {
// Register the new channel.
let (channel, mut reader) = Channel::new(remote_address, stream);
let (mut channel, mut reader) = Channel::new(remote_address, stream);

let mut handshake_buffer = [0u8; 64];

Expand All @@ -245,7 +245,7 @@ impl Inbound {
// Create the remote address from the given peer address, and specified port from the version message.
let remote_address = SocketAddr::new(remote_address.ip(), remote_version.listening_port);

let channel = channel.update_address(remote_address).await?;
channel.update_address(remote_address).await;

// TODO (raychu86): Establish a formal node version.
let local_version = Version::new_with_rng(1u64, block_height, listener_address.port());
Expand Down
12 changes: 4 additions & 8 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl Server {
Ok(())
}

pub async fn start_services(&self) -> Result<(), NetworkError> {
pub async fn start_services(&self) {
let peer_sync_interval = self.environment.peer_sync_interval();
let peers = self.peers.clone();

Expand Down Expand Up @@ -145,9 +145,7 @@ impl Server {
// select last seen node as block sync node
let sync_node = peers.last_seen();

if let Err(e) = blocks.update(sync_node).await {
error!("Block update error: {}", e);
}
blocks.update(sync_node).await;
}
});

Expand Down Expand Up @@ -175,14 +173,12 @@ impl Server {
}
}
});

Ok(())
}

pub async fn start(&mut self) -> Result<(), NetworkError> {
debug!("Initializing the connection server");
self.establish_address().await?;
self.start_services().await?;
self.start_services().await;
debug!("Connection server initialized");

Ok(())
Expand Down Expand Up @@ -246,7 +242,7 @@ impl Server {
self.blocks.received_get_sync(source.unwrap(), getsync).await?;
}
Payload::Sync(sync) => {
self.blocks.received_sync(source.unwrap(), sync).await?;
self.blocks.received_sync(source.unwrap(), sync).await;
}
Payload::Disconnect(addr) => {
if direction == Direction::Internal {
Expand Down
30 changes: 15 additions & 15 deletions network/src/peers/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl Peers {
// Check if this node server is below the permitted number of connected peers.
if number_of_connected_peers < self.environment.minimum_number_of_connected_peers() {
// Attempt to connect to the default bootnodes of the network.
self.connect_to_bootnodes().await?;
self.connect_to_bootnodes().await;

// Attempt to connect to each disconnected peer saved in the peer book.
self.connect_to_disconnected_peers().await?;
self.connect_to_disconnected_peers().await;

// Broadcast a `GetPeers` message to request for more peers.
self.broadcast_getpeers_requests();
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Peers {

if number_of_connected_peers != 0 {
// Broadcast a `Version` request to each connected peer.
self.broadcast_version_requests()?;
self.broadcast_version_requests();

// Store the peer book to storage.
self.save_peer_book_to_storage()?;
Expand Down Expand Up @@ -299,7 +299,7 @@ impl Peers {
///
/// This function filters out any bootnode peers the node server is already connected to.
///
async fn connect_to_bootnodes(&self) -> Result<(), NetworkError> {
async fn connect_to_bootnodes(&self) {
trace!("Connecting to default bootnodes");

// Fetch the current connected peers of this node.
Expand All @@ -312,26 +312,26 @@ impl Peers {
.iter()
.filter(|addr| !connected_peers.contains_key(addr))
{
self.initiate_connection(*bootnode_address).await?;
if let Err(e) = self.initiate_connection(*bootnode_address).await {
warn!("Couldn't connect to bootnode {}: {}", bootnode_address, e);
}
}

Ok(())
}

/// Broadcasts a connection request to all disconnected peers.
async fn connect_to_disconnected_peers(&self) -> Result<(), NetworkError> {
async fn connect_to_disconnected_peers(&self) {
trace!("Connecting to disconnected peers");

// Iterate through each connected peer and attempts a connection request.
for (remote_address, _) in self.disconnected_peers() {
self.initiate_connection(remote_address).await?;
if let Err(e) = self.initiate_connection(remote_address).await {
warn!("Couldn't connect to the disconnected peer {}: {}", remote_address, e);
}
}

Ok(())
}

/// Broadcasts a `Version` message to all connected peers.
fn broadcast_version_requests(&self) -> Result<(), NetworkError> {
fn broadcast_version_requests(&self) {
// Get the local address of this node.
let local_address = self.local_address().unwrap(); // must be known by now
// Fetch the current block height of this node.
Expand All @@ -355,11 +355,11 @@ impl Peers {

// Disconnect from the peer if there is no active connection channel
// TODO (howardwu): Inform Outbound to also disconnect, by dropping any channels held with this peer.
self.disconnected_from_peer(&remote_address)?;
if let Err(e) = self.disconnected_from_peer(&remote_address) {
warn!("Couldn't mark {} as disconnected: {}", remote_address, e);
}
};
}

Ok(())
}

/// Broadcasts a `GetPeers` message to all connected peers to request for more peers.
Expand Down
2 changes: 1 addition & 1 deletion snarkos/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
}

// Start the server
server.start_services().await?;
server.start_services().await;

stream::pending::<()>().next().await;

Expand Down
8 changes: 2 additions & 6 deletions snarkos/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,10 @@ impl MinerInstance {
info!("Mined a new block!\t{:?}", hex::encode(block.header.get_hash().0));
let peers = self.node_server.peers.connected_peers();

if let Err(err) = self
.node_server
self.node_server
.blocks
.propagate_block(block_serialized, local_address, &peers)
.await
{
error!("Error propagating block to peers: {:?}", err);
}
.await;
}
Err(_) => continue,
}
Expand Down

0 comments on commit 61b38a7

Please sign in to comment.