Skip to content

Commit

Permalink
fmt: a surprisingly large subsequent cargo fmt change
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Feb 12, 2022
1 parent 2e3ffbf commit e94d513
Showing 1 changed file with 109 additions and 110 deletions.
219 changes: 109 additions & 110 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,126 +183,125 @@ impl<N: Network, E: Environment> Peer<N, E> {
outbound_socket.send(message).await?;

// Wait for the counterparty challenge request to come in.
let (peer_nonce, node_type, status) =
match outbound_socket.next().await {
Some(Ok(message)) => {
// Process the message.
trace!("Received '{}-B' from {}", message.name(), peer_ip);
match message {
Message::ChallengeRequest(
version,
fork_depth,
node_type,
peer_status,
listener_port,
peer_nonce,
peer_cumulative_weight,
) => {
// Ensure the message protocol version is not outdated.
if version < E::MESSAGE_VERSION {
warn!("Dropping {} on version {} (outdated)", peer_ip, version);

// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::OutdatedClientVersion);
outbound_socket.send(message).await?;

return Err(anyhow!("Dropping {} on version {} (outdated)", peer_ip, version));
}
// Ensure the maximum fork depth is correct.
if fork_depth != N::ALEO_MAXIMUM_FORK_DEPTH {
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::InvalidForkDepth);
outbound_socket.send(message).await?;

return Err(anyhow!(
"Dropping {} for an incorrect maximum fork depth of {}",
peer_ip,
fork_depth
));
}
// If this node is not a sync node and is syncing, the peer is a sync node, and this node is ahead, proceed to disconnect.
if E::NODE_TYPE != NodeType::Sync
&& E::status().is_syncing()
&& node_type == NodeType::Sync
&& local_cumulative_weight > peer_cumulative_weight
{
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::YouNeedToSyncFirst);
outbound_socket.send(message).await?;

return Err(anyhow!("Dropping {} as this node is ahead", peer_ip));
}
// If this node is a sync node, the peer is not a sync node and is syncing, and the peer is ahead, proceed to disconnect.
if E::NODE_TYPE == NodeType::Sync
&& node_type != NodeType::Sync
&& peer_status == State::Syncing
&& peer_cumulative_weight > local_cumulative_weight
{
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::INeedToSyncFirst);
outbound_socket.send(message).await?;

return Err(anyhow!("Dropping {} as this node is ahead", peer_ip));
}
// Ensure the peer is not this node.
if local_nonce == peer_nonce {
return Err(anyhow!("Attempted to connect to self (nonce = {})", peer_nonce));
}
// Ensure the peer is not already connected to this node.
if connected_nonces.contains(&peer_nonce) {
return Err(anyhow!("Already connected to a peer with nonce {}", peer_nonce));
}
// Verify the listener port.
if peer_ip.port() != listener_port {
// Update the peer IP to the listener port.
peer_ip.set_port(listener_port);
let (peer_nonce, node_type, status) = match outbound_socket.next().await {
Some(Ok(message)) => {
// Process the message.
trace!("Received '{}-B' from {}", message.name(), peer_ip);
match message {
Message::ChallengeRequest(
version,
fork_depth,
node_type,
peer_status,
listener_port,
peer_nonce,
peer_cumulative_weight,
) => {
// Ensure the message protocol version is not outdated.
if version < E::MESSAGE_VERSION {
warn!("Dropping {} on version {} (outdated)", peer_ip, version);

// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::OutdatedClientVersion);
outbound_socket.send(message).await?;

// This check needs to be excluded from network integration tests.
#[cfg(not(feature = "test"))]
{
// Ensure the claimed listener port is open.
let _ = match timeout(Duration::from_millis(E::CONNECTION_TIMEOUT_IN_MILLIS), TcpStream::connect(peer_ip))
.await
{
Ok(stream) => stream,
Err(error) => {
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::YourPortIsClosed(listener_port));
outbound_socket.send(message).await?;
return Err(anyhow!("Dropping {} on version {} (outdated)", peer_ip, version));
}
// Ensure the maximum fork depth is correct.
if fork_depth != N::ALEO_MAXIMUM_FORK_DEPTH {
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::InvalidForkDepth);
outbound_socket.send(message).await?;

return Err(anyhow!("Unable to reach '{}': '{:?}'", peer_ip, error));
},
};
}
}
// Send the challenge response.
let message = Message::ChallengeResponse(Data::Object(genesis_header.clone()));
trace!("Sending '{}-B' to {}", message.name(), peer_ip);
return Err(anyhow!(
"Dropping {} for an incorrect maximum fork depth of {}",
peer_ip,
fork_depth
));
}
// If this node is not a sync node and is syncing, the peer is a sync node, and this node is ahead, proceed to disconnect.
if E::NODE_TYPE != NodeType::Sync
&& E::status().is_syncing()
&& node_type == NodeType::Sync
&& local_cumulative_weight > peer_cumulative_weight
{
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::YouNeedToSyncFirst);
outbound_socket.send(message).await?;

// Initialize a status variable.
let status = Status::new();
status.update(peer_status);
return Err(anyhow!("Dropping {} as this node is ahead", peer_ip));
}
// If this node is a sync node, the peer is not a sync node and is syncing, and the peer is ahead, proceed to disconnect.
if E::NODE_TYPE == NodeType::Sync
&& node_type != NodeType::Sync
&& peer_status == State::Syncing
&& peer_cumulative_weight > local_cumulative_weight
{
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::INeedToSyncFirst);
outbound_socket.send(message).await?;

(peer_nonce, node_type, status)
return Err(anyhow!("Dropping {} as this node is ahead", peer_ip));
}
Message::Disconnect(reason) => {
bail!("Peer {} disconnected for the following reason: {:?}", peer_ip, reason);
// Ensure the peer is not this node.
if local_nonce == peer_nonce {
return Err(anyhow!("Attempted to connect to self (nonce = {})", peer_nonce));
}
message => {
return Err(anyhow!(
"Expected challenge request, received '{}' from {}",
message.name(),
peer_ip
));
// Ensure the peer is not already connected to this node.
if connected_nonces.contains(&peer_nonce) {
return Err(anyhow!("Already connected to a peer with nonce {}", peer_nonce));
}
// Verify the listener port.
if peer_ip.port() != listener_port {
// Update the peer IP to the listener port.
peer_ip.set_port(listener_port);

// This check needs to be excluded from network integration tests.
#[cfg(not(feature = "test"))]
{
// Ensure the claimed listener port is open.
let _ = match timeout(Duration::from_millis(E::CONNECTION_TIMEOUT_IN_MILLIS), TcpStream::connect(peer_ip))
.await
{
Ok(stream) => stream,
Err(error) => {
// Send the disconnect message.
let message = Message::Disconnect(DisconnectReason::YourPortIsClosed(listener_port));
outbound_socket.send(message).await?;

return Err(anyhow!("Unable to reach '{}': '{:?}'", peer_ip, error));
}
};
}
}
// Send the challenge response.
let message = Message::ChallengeResponse(Data::Object(genesis_header.clone()));
trace!("Sending '{}-B' to {}", message.name(), peer_ip);
outbound_socket.send(message).await?;

// Initialize a status variable.
let status = Status::new();
status.update(peer_status);

(peer_nonce, node_type, status)
}
Message::Disconnect(reason) => {
bail!("Peer {} disconnected for the following reason: {:?}", peer_ip, reason);
}
message => {
return Err(anyhow!(
"Expected challenge request, received '{}' from {}",
message.name(),
peer_ip
));
}
}
// An error occurred.
Some(Err(error)) => return Err(anyhow!("Failed to get challenge request from {}: {:?}", peer_ip, error)),
// Did not receive anything.
None => return Err(anyhow!("Dropped prior to challenge request of {}", peer_ip)),
};
}
// An error occurred.
Some(Err(error)) => return Err(anyhow!("Failed to get challenge request from {}: {:?}", peer_ip, error)),
// Did not receive anything.
None => return Err(anyhow!("Dropped prior to challenge request of {}", peer_ip)),
};

// Wait for the challenge response to come in.
match outbound_socket.next().await {
Expand Down

0 comments on commit e94d513

Please sign in to comment.