Skip to content

Commit

Permalink
Merge pull request ProvableHQ#2902 from niklaslong/fix/2894-unsolicit…
Browse files Browse the repository at this point in the history
…ed-peer-response

Reject unsolicited `PeerResponse` messages
  • Loading branch information
howardwu authored Dec 27, 2023
2 parents f0e5ffd + 8c5a262 commit 767afbf
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 170 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ version = "0.1"
[dev-dependencies.deadline]
version = "0.2"

[dev-dependencies.paste]
version = "1"

[dev-dependencies.pea2pea]
version = "0.46"

Expand Down
49 changes: 49 additions & 0 deletions node/router/src/helpers/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct Cache<N: Network> {
seen_outbound_solutions: RwLock<LinkedHashMap<SolutionKey<N>, OffsetDateTime>>,
/// The map of transaction IDs to their last seen timestamp.
seen_outbound_transactions: RwLock<LinkedHashMap<TransactionKey<N>, OffsetDateTime>>,
/// The map of peer IPs to the number of sent peer requests.
seen_outbound_peer_requests: RwLock<IndexMap<SocketAddr, u32>>,
}

impl<N: Network> Default for Cache<N> {
Expand All @@ -75,6 +77,7 @@ impl<N: Network> Cache<N> {
seen_outbound_puzzle_requests: Default::default(),
seen_outbound_solutions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_outbound_transactions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_outbound_peer_requests: Default::default(),
}
}
}
Expand Down Expand Up @@ -166,6 +169,21 @@ impl<N: Network> Cache<N> {
) -> Option<OffsetDateTime> {
Self::refresh_and_insert(&self.seen_outbound_transactions, (peer_ip, transaction))
}

/// Returns `true` if the cache contains a peer request from the given peer.
pub fn contains_outbound_peer_request(&self, peer_ip: SocketAddr) -> bool {
self.seen_outbound_peer_requests.read().get(&peer_ip).map(|r| *r > 0).unwrap_or(false)
}

/// Increment the peer IP's number of peer requests, returning the updated number of peer requests.
pub fn increment_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::increment_counter(&self.seen_outbound_peer_requests, peer_ip)
}

/// Decrement the peer IP's number of peer requests, returning the updated number of peer requests.
pub fn decrement_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::decrement_counter(&self.seen_outbound_peer_requests, peer_ip)
}
}

impl<N: Network> Cache<N> {
Expand Down Expand Up @@ -336,4 +354,35 @@ mod tests {
// Check that the cache still contains the transaction.
assert_eq!(cache.seen_outbound_transactions.read().len(), 1);
}

#[test]
fn test_outbound_peer_request() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);

// Check the cache is empty.
assert!(cache.seen_outbound_peer_requests.read().is_empty());
assert!(!cache.contains_outbound_peer_request(peer_ip));

// Increment the peer requests.
assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 1);

// Check the cache contains the peer request.
assert!(cache.contains_outbound_peer_request(peer_ip));

// Increment the peer requests again for the same peer IP.
assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 2);

// Check the cache still contains the peer request.
assert!(cache.contains_outbound_peer_request(peer_ip));

// Decrement the peer requests.
assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 1);

// Decrement the peer requests again.
assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 0);

// Check the cache is empty.
assert!(!cache.contains_outbound_peer_request(peer_ip));
}
}
14 changes: 10 additions & 4 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,16 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
true => Ok(()),
false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
},
Message::PeerResponse(message) => match self.peer_response(peer_ip, &message.peers) {
true => Ok(()),
false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
},
Message::PeerResponse(message) => {
if !self.router().cache.contains_outbound_peer_request(peer_ip) {
bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
}

match self.peer_response(peer_ip, &message.peers) {
true => Ok(()),
false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
}
}
Message::Ping(message) => {
// Ensure the message protocol version is not outdated.
if message.version < Message::<N>::VERSION {
Expand Down
1 change: 0 additions & 1 deletion node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ mod helpers;
pub use helpers::*;

mod handshake;
pub use handshake::*;

mod heartbeat;
pub use heartbeat::*;
Expand Down
4 changes: 4 additions & 0 deletions node/router/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
if matches!(message, Message::PuzzleRequest(_)) {
self.router().cache.increment_outbound_puzzle_requests(peer_ip);
}
// If the message type is a peer request, increment the cache.
if matches!(message, Message::PeerRequest(_)) {
self.router().cache.increment_outbound_peer_requests(peer_ip);
}
// Retrieve the message name.
let name = message.name();
// Send the message to the peer.
Expand Down
191 changes: 191 additions & 0 deletions node/tests/disconnect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![recursion_limit = "256"]

#[allow(dead_code)]
mod common;
use common::{node::*, test_peer::TestPeer};

use snarkos_node_router::Outbound;
use snarkos_node_tcp::P2P;

use deadline::deadline;
use std::time::Duration;

// Macro to simply construct disconnect cases.
// Syntax:
// - (full_node |> test_peer): full node disconnects from the synthetic test peer.
// - (full_node <| test_peer): synthetic test peer disconnects from the full node.
//
// Test naming: full_node::handshake_<node or peer>_side::test_peer.
macro_rules! test_disconnect {
($node_type:ident, $peer_type:ident, $node_disconnects:expr, $($attr:meta)?) => {
#[tokio::test]
$(#[$attr])?
async fn $peer_type() {
use deadline::deadline;
use pea2pea::Pea2Pea;
use snarkos_node_router::Outbound;
use snarkos_node_tcp::P2P;
use std::time::Duration;

// $crate::common::initialise_logger(2);

// Spin up a full node.
let node = $crate::$node_type().await;

// Spin up a test peer (synthetic node).
let peer = $crate::TestPeer::$peer_type().await;
let peer_addr = peer.node().listening_addr().unwrap();

// Connect the node to the test peer.
node.router().connect(peer_addr).unwrap().await.unwrap();

// Check the peer counts.
let node_clone = node.clone();
deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 1);
let node_clone = node.clone();
deadline!(Duration::from_secs(5), move || node_clone.tcp().num_connected() == 1);
let peer_clone = peer.clone();
deadline!(Duration::from_secs(5), move || peer_clone.node().num_connected() == 1);

// Disconnect.
if $node_disconnects {
node.router().disconnect(node.tcp().connected_addrs()[0]).await.unwrap();
} else {
peer.node().disconnect(peer.node().connected_addrs()[0]).await;
}

// Check the peer counts have been updated.
let node_clone = node.clone();
deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0);
deadline!(Duration::from_secs(5), move || node.tcp().num_connected() == 0);
deadline!(Duration::from_secs(5), move || peer.node().num_connected() == 0);

}
};

// Node side disconnect.
($($node_type:ident |> $peer_type:ident $(= $attr:meta)?),*) => {
mod disconnect_node_side {
$(
test_disconnect!($node_type, $peer_type, true, $($attr)?);
)*
}
};

// Peer side disconnect.
($($node_type:ident <| $peer_type:ident $(= $attr:meta)?),*) => {
mod disconnect_peer_side {
$(
test_disconnect!($node_type, $peer_type, false, $($attr)?);
)*
}
};
}

mod client {
// Full node disconnects from synthetic peer.
test_disconnect! {
client |> client,
client |> validator,
client |> prover
}

// Synthetic peer disconnects from the full node.
test_disconnect! {
client <| client,
client <| validator,
client <| prover
}
}

mod prover {
// Full node disconnects from synthetic peer.
test_disconnect! {
prover |> client,
prover |> validator,
prover |> prover
}

// Synthetic peer disconnects from the full node.
test_disconnect! {
prover <| client,
prover <| validator,
prover <| prover
}
}

mod validator {
// Full node disconnects from synthetic peer.
test_disconnect! {
validator |> client,
validator |> validator,
validator |> prover
}

// Synthetic peer disconnects from the full node.
test_disconnect! {
validator <| client,
validator <| validator,
validator <| prover
}
}

#[tokio::test(flavor = "multi_thread")]
async fn duplicate_disconnect_attempts() {
// common::initialise_logger(3);

// Spin up 2 full nodes.
let node1 = validator().await;
let node2 = validator().await;
let addr2 = node2.tcp().listening_addr().unwrap();

// Connect node1 to node2.
assert!(node1.router().connect(addr2).unwrap().await.unwrap());

// Prepare disconnect attempts.
let node1_clone = node1.clone();
let disconn1 = tokio::spawn(async move { node1_clone.router().disconnect(addr2).await.unwrap() });
let node1_clone = node1.clone();
let disconn2 = tokio::spawn(async move { node1_clone.router().disconnect(addr2).await.unwrap() });
let node1_clone = node1.clone();
let disconn3 = tokio::spawn(async move { node1_clone.router().disconnect(addr2).await.unwrap() });

// Attempt to disconnect the 1st node from the other one several times at once.
let (result1, result2, result3) = tokio::join!(disconn1, disconn2, disconn3);
// A small anti-flakiness buffer.

// Count the successes.
let mut successes = 0;
if result1.unwrap() {
successes += 1;
}
if result2.unwrap() {
successes += 1;
}
if result3.unwrap() {
successes += 1;
}

// There may only be a single success.
assert_eq!(successes, 1);

// Connection checks.
let node1_clone = node1.clone();
deadline!(Duration::from_secs(5), move || node1_clone.router().number_of_connected_peers() == 0);
let node2_clone = node2.clone();
deadline!(Duration::from_secs(5), move || node2_clone.router().number_of_connected_peers() == 0);
}
Loading

0 comments on commit 767afbf

Please sign in to comment.