diff --git a/network/src/blocks/blocks.rs b/network/src/blocks/blocks.rs index 939f1f3cb1..577efd8f1a 100644 --- a/network/src/blocks/blocks.rs +++ b/network/src/blocks/blocks.rs @@ -40,7 +40,7 @@ impl Blocks { /// /// Broadcasts updates with connected peers and maintains a permitted number of connected peers. /// - pub async fn update(&self, sync_node: Option) -> Result<(), NetworkError> { + pub async fn update(&self, sync_node: Option) { // Check that this node is not a bootnode. if !self.environment.is_bootnode() { let block_locator_hashes = self.environment.storage().read().get_block_locator_hashes(); @@ -56,8 +56,6 @@ impl Blocks { info!("No sync node is registered, blocks could not be synced"); } } - - Ok(()) } /// @@ -74,7 +72,7 @@ impl Blocks { block_bytes: Vec, block_miner: SocketAddr, connected_peers: &HashMap, - ) -> Result<(), NetworkError> { + ) { debug!("Propagating a block to peers"); let local_address = self.local_address(); @@ -87,8 +85,6 @@ impl Blocks { )); } } - - Ok(()) } /// A peer has sent us a new block to process. @@ -126,7 +122,7 @@ impl Blocks { // This is a new block, send it to our peers. if let Some(connected_peers) = connected_peers { if is_new_block { - self.propagate_block(block, remote_address, &connected_peers).await?; + self.propagate_block(block, remote_address, &connected_peers).await; } } } @@ -198,11 +194,7 @@ impl Blocks { } /// A peer has sent us their chain state. - pub(crate) async fn received_sync( - &self, - remote_address: SocketAddr, - block_hashes: Vec, - ) -> Result<(), NetworkError> { + pub(crate) async fn received_sync(&self, remote_address: SocketAddr, block_hashes: Vec) { // If empty sync is no-op as chain states match if !block_hashes.is_empty() { // GetBlocks for each block hash: fire and forget, relying on block locator hashes to @@ -214,7 +206,5 @@ impl Blocks { )); } } - - Ok(()) } } diff --git a/network/src/external/channel.rs b/network/src/external/channel.rs index 7996ddc12f..6d0e4bc193 100644 --- a/network/src/external/channel.rs +++ b/network/src/external/channel.rs @@ -53,20 +53,15 @@ impl Channel { Ok(Channel::new(remote_address, stream)) } - /// Returns a new channel with the specified address and new writer stream. - pub async fn update_address(self, remote_address: SocketAddr) -> Result { - Ok(Self { - remote_address, - writer: self.writer, - }) + /// Updates the address associated with the given channel. + pub async fn update_address(&mut self, remote_address: SocketAddr) { + self.remote_address = remote_address; } /// Writes a message header + payload. pub async fn write(&self, payload: &Payload) -> Result<(), ConnectError> { let serialized_payload = bincode::serialize(payload).map_err(|e| ConnectError::MessageError(e.into()))?; - let header = MessageHeader { - len: serialized_payload.len() as u32, - }; + let header = MessageHeader::from(serialized_payload.len()); { let mut writer = self.writer.lock().await; diff --git a/network/src/external/message/read.rs b/network/src/external/message/read.rs index 45f820f696..3108494a9a 100644 --- a/network/src/external/message/read.rs +++ b/network/src/external/message/read.rs @@ -35,7 +35,7 @@ pub async fn read_header(mut stream: &mut T) -> Result MAX_MESSAGE_SIZE { - return Err(MessageHeaderError::TooBig(header.len as usize, MAX_MESSAGE_SIZE)); + Err(MessageHeaderError::TooBig(header.len as usize, MAX_MESSAGE_SIZE)) } else { Ok(header) } diff --git a/network/src/inbound/inbound.rs b/network/src/inbound/inbound.rs index fa0ae12c5c..d58863fc38 100644 --- a/network/src/inbound/inbound.rs +++ b/network/src/inbound/inbound.rs @@ -226,7 +226,7 @@ impl Inbound { stream: TcpStream, ) -> Result<(Channel, OwnedReadHalf), NetworkError> { // Register the new channel. - let (channel, mut reader) = Channel::new(remote_address, stream); + let (mut channel, mut reader) = Channel::new(remote_address, stream); let mut handshake_buffer = [0u8; 64]; @@ -245,7 +245,7 @@ impl Inbound { // Create the remote address from the given peer address, and specified port from the version message. let remote_address = SocketAddr::new(remote_address.ip(), remote_version.listening_port); - let channel = channel.update_address(remote_address).await?; + channel.update_address(remote_address).await; // TODO (raychu86): Establish a formal node version. let local_version = Version::new_with_rng(1u64, block_height, listener_address.port()); diff --git a/network/src/lib.rs b/network/src/lib.rs index 6215d48e0f..72cdae54a7 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -115,7 +115,7 @@ impl Server { Ok(()) } - pub async fn start_services(&self) -> Result<(), NetworkError> { + pub async fn start_services(&self) { let peer_sync_interval = self.environment.peer_sync_interval(); let peers = self.peers.clone(); @@ -145,9 +145,7 @@ impl Server { // select last seen node as block sync node let sync_node = peers.last_seen(); - if let Err(e) = blocks.update(sync_node).await { - error!("Block update error: {}", e); - } + blocks.update(sync_node).await; } }); @@ -175,14 +173,12 @@ impl Server { } } }); - - Ok(()) } pub async fn start(&mut self) -> Result<(), NetworkError> { debug!("Initializing the connection server"); self.establish_address().await?; - self.start_services().await?; + self.start_services().await; debug!("Connection server initialized"); Ok(()) @@ -246,7 +242,7 @@ impl Server { self.blocks.received_get_sync(source.unwrap(), getsync).await?; } Payload::Sync(sync) => { - self.blocks.received_sync(source.unwrap(), sync).await?; + self.blocks.received_sync(source.unwrap(), sync).await; } Payload::Disconnect(addr) => { if direction == Direction::Internal { diff --git a/network/src/peers/peers.rs b/network/src/peers/peers.rs index c4ed203204..c86062ff47 100644 --- a/network/src/peers/peers.rs +++ b/network/src/peers/peers.rs @@ -89,10 +89,10 @@ impl Peers { // Check if this node server is below the permitted number of connected peers. if number_of_connected_peers < self.environment.minimum_number_of_connected_peers() { // Attempt to connect to the default bootnodes of the network. - self.connect_to_bootnodes().await?; + self.connect_to_bootnodes().await; // Attempt to connect to each disconnected peer saved in the peer book. - self.connect_to_disconnected_peers().await?; + self.connect_to_disconnected_peers().await; // Broadcast a `GetPeers` message to request for more peers. self.broadcast_getpeers_requests(); @@ -127,7 +127,7 @@ impl Peers { if number_of_connected_peers != 0 { // Broadcast a `Version` request to each connected peer. - self.broadcast_version_requests()?; + self.broadcast_version_requests(); // Store the peer book to storage. self.save_peer_book_to_storage()?; @@ -299,7 +299,7 @@ impl Peers { /// /// This function filters out any bootnode peers the node server is already connected to. /// - async fn connect_to_bootnodes(&self) -> Result<(), NetworkError> { + async fn connect_to_bootnodes(&self) { trace!("Connecting to default bootnodes"); // Fetch the current connected peers of this node. @@ -312,26 +312,26 @@ impl Peers { .iter() .filter(|addr| !connected_peers.contains_key(addr)) { - self.initiate_connection(*bootnode_address).await?; + if let Err(e) = self.initiate_connection(*bootnode_address).await { + warn!("Couldn't connect to bootnode {}: {}", bootnode_address, e); + } } - - Ok(()) } /// Broadcasts a connection request to all disconnected peers. - async fn connect_to_disconnected_peers(&self) -> Result<(), NetworkError> { + async fn connect_to_disconnected_peers(&self) { trace!("Connecting to disconnected peers"); // Iterate through each connected peer and attempts a connection request. for (remote_address, _) in self.disconnected_peers() { - self.initiate_connection(remote_address).await?; + if let Err(e) = self.initiate_connection(remote_address).await { + warn!("Couldn't connect to the disconnected peer {}: {}", remote_address, e); + } } - - Ok(()) } /// Broadcasts a `Version` message to all connected peers. - fn broadcast_version_requests(&self) -> Result<(), NetworkError> { + fn broadcast_version_requests(&self) { // Get the local address of this node. let local_address = self.local_address().unwrap(); // must be known by now // Fetch the current block height of this node. @@ -355,11 +355,11 @@ impl Peers { // Disconnect from the peer if there is no active connection channel // TODO (howardwu): Inform Outbound to also disconnect, by dropping any channels held with this peer. - self.disconnected_from_peer(&remote_address)?; + if let Err(e) = self.disconnected_from_peer(&remote_address) { + warn!("Couldn't mark {} as disconnected: {}", remote_address, e); + } }; } - - Ok(()) } /// Broadcasts a `GetPeers` message to all connected peers to request for more peers. diff --git a/snarkos/main.rs b/snarkos/main.rs index 5f51ada53a..15f28cdc37 100644 --- a/snarkos/main.rs +++ b/snarkos/main.rs @@ -190,7 +190,7 @@ async fn start_server(config: Config) -> anyhow::Result<()> { } // Start the server - server.start_services().await?; + server.start_services().await; stream::pending::<()>().next().await; diff --git a/snarkos/miner.rs b/snarkos/miner.rs index 556ad5a964..33a6f923f7 100644 --- a/snarkos/miner.rs +++ b/snarkos/miner.rs @@ -99,14 +99,10 @@ impl MinerInstance { info!("Mined a new block!\t{:?}", hex::encode(block.header.get_hash().0)); let peers = self.node_server.peers.connected_peers(); - if let Err(err) = self - .node_server + self.node_server .blocks .propagate_block(block_serialized, local_address, &peers) - .await - { - error!("Error propagating block to peers: {:?}", err); - } + .await; } Err(_) => continue, }