Skip to content

Commit

Permalink
[network] emit metadata on new and lost peers
Browse files Browse the repository at this point in the history
When a new connection is formed, there is a lot of information available
and rather than just expose what we need for now, let's expose all that
information and let the recipients take what they need.

1. We have already extended the data here once if not twice
2. We have another requirement to extend it to include trusted peer / role

The interesting thing is that state sync and mempool already translate
these events into yet another event so there will need to be ensuing PRs
to address that.

Closes: aptos-labs#7102
  • Loading branch information
davidiw authored and bors-libra committed Jan 4, 2021
1 parent 662713c commit dbf71d0
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 88 deletions.
18 changes: 7 additions & 11 deletions mempool/src/tests/shared_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use diem_config::{
network_id::{NetworkContext, NetworkId, NodeNetworkId},
};
use diem_infallible::{Mutex, RwLock};
use diem_network_address::NetworkAddress;
use diem_types::{
transaction::{GovernanceRole, SignedTransaction},
PeerId,
Expand All @@ -37,6 +36,7 @@ use network::{
PeerManagerNotification, PeerManagerRequest, PeerManagerRequestSender,
},
protocols::network::{NewNetworkEvents, NewNetworkSender},
transport::ConnectionMetadata,
DisconnectReason, ProtocolId,
};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -289,25 +289,21 @@ impl SharedMempoolNetwork {
}

fn send_new_peer_event(&mut self, receiver: &PeerId, new_peer: &PeerId, inbound: bool) {
let origin = if inbound {
let mut metadata = ConnectionMetadata::mock(*new_peer);
metadata.origin = if inbound {
ConnectionOrigin::Inbound
} else {
ConnectionOrigin::Outbound
};
let notif = ConnectionNotification::NewPeer(
*new_peer,
NetworkAddress::mock(),
origin,
NetworkContext::mock(),
);

let notif = ConnectionNotification::NewPeer(metadata, NetworkContext::mock());
self.send_connection_event(receiver, notif)
}

fn send_lost_peer_event(&mut self, receiver: &PeerId, lost_peer: &PeerId) {
let notif = ConnectionNotification::LostPeer(
*lost_peer,
NetworkAddress::mock(),
ConnectionOrigin::Inbound,
ConnectionMetadata::mock(*lost_peer),
NetworkContext::mock(),
DisconnectReason::ConnectionLost,
);
self.send_connection_event(receiver, notif)
Expand Down
16 changes: 9 additions & 7 deletions network/src/connectivity_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,15 +651,17 @@ where
"Connection notification"
);
match notif {
peer_manager::ConnectionNotification::NewPeer(peer_id, addr, _origin, _context) => {
peer_manager::ConnectionNotification::NewPeer(metadata, _context) => {
let peer_id = metadata.remote_peer_id;
counters::peer_connected(&self.network_context, &peer_id, 1);
self.connected.insert(peer_id, addr);
self.connected.insert(peer_id, metadata.addr);

// Cancel possible queued dial to this peer.
self.dial_states.remove(&peer_id);
self.dial_queue.remove(&peer_id);
}
peer_manager::ConnectionNotification::LostPeer(peer_id, addr, _origin, _reason) => {
peer_manager::ConnectionNotification::LostPeer(metadata, _context, _reason) => {
let peer_id = metadata.remote_peer_id;
if let Some(stored_addr) = self.connected.get(&peer_id) {
// Remove node from connected peers list.

Expand All @@ -668,24 +670,24 @@ where
info!(
NetworkSchema::new(&self.network_context)
.remote_peer(&peer_id)
.network_address(&addr),
.network_address(&metadata.addr),
stored_addr = stored_addr,
"{} Removing peer '{}' addr: {}, vs event addr: {}",
self.network_context,
peer_id.short_str(),
stored_addr,
addr
metadata.addr
);
self.connected.remove(&peer_id);
} else {
info!(
NetworkSchema::new(&self.network_context)
.remote_peer(&peer_id)
.network_address(&addr),
.network_address(&metadata.addr),
"{} Ignoring stale lost peer event for peer: {}, addr: {}",
self.network_context,
peer_id.short_str(),
addr
metadata.addr
);
}
}
Expand Down
21 changes: 8 additions & 13 deletions network/src/connectivity_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::*;
use crate::{
peer::DisconnectReason,
peer_manager::{conn_notifs_channel, ConnectionRequest},
transport::ConnectionMetadata,
};
use channel::{diem_channel, message_queues::QueueStyle};
use core::str::FromStr;
Expand All @@ -13,7 +14,6 @@ use diem_crypto::{test_utils::TEST_SEED, x25519, Uniform};
use diem_logger::info;
use diem_network_address::NetworkAddress;
use futures::SinkExt;
use netcore::transport::ConnectionOrigin;
use rand::rngs::StdRng;
use std::io;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -124,12 +124,9 @@ async fn send_new_peer_await_delivery(
notif_peer_id: PeerId,
address: NetworkAddress,
) {
let notif = peer_manager::ConnectionNotification::NewPeer(
notif_peer_id,
address,
ConnectionOrigin::Inbound,
NetworkContext::mock(),
);
let mut metadata = ConnectionMetadata::mock(notif_peer_id);
metadata.addr = address;
let notif = peer_manager::ConnectionNotification::NewPeer(metadata, NetworkContext::mock());
send_notification_await_delivery(connection_notifs_tx, peer_id, notif).await;
}

Expand All @@ -140,12 +137,10 @@ async fn send_lost_peer_await_delivery(
address: NetworkAddress,
reason: DisconnectReason,
) {
let notif = peer_manager::ConnectionNotification::LostPeer(
notif_peer_id,
address,
ConnectionOrigin::Inbound,
reason,
);
let mut metadata = ConnectionMetadata::mock(notif_peer_id);
metadata.addr = address;
let notif =
peer_manager::ConnectionNotification::LostPeer(metadata, NetworkContext::mock(), reason);
send_notification_await_delivery(connection_notifs_tx, peer_id, notif).await;
}

Expand Down
18 changes: 6 additions & 12 deletions network/src/peer_manager/conn_notifs_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,22 @@ pub fn new() -> (Sender, Receiver) {
#[cfg(test)]
mod test {
use super::*;
use crate::peer::DisconnectReason;
use crate::{peer::DisconnectReason, transport::ConnectionMetadata};
use diem_config::network_id::NetworkContext;
use diem_network_address::NetworkAddress;
use futures::{executor::block_on, future::FutureExt, stream::StreamExt};
use netcore::transport::ConnectionOrigin;

fn send_new_peer(sender: &mut Sender, peer_id: PeerId) {
let notif = ConnectionNotification::NewPeer(
peer_id,
NetworkAddress::mock(),
ConnectionOrigin::Inbound,
ConnectionMetadata::mock(peer_id),
NetworkContext::mock(),
);
sender.push(peer_id, notif).unwrap()
}

fn send_lost_peer(sender: &mut Sender, peer_id: PeerId, reason: DisconnectReason) {
let notif = ConnectionNotification::LostPeer(
peer_id,
NetworkAddress::mock(),
ConnectionOrigin::Inbound,
ConnectionMetadata::mock(peer_id),
NetworkContext::mock_with_peer_id(peer_id),
reason,
);
sender.push(peer_id, notif).unwrap()
Expand All @@ -61,9 +56,8 @@ mod test {

// Ensure that only the last message is received.
let notif = ConnectionNotification::LostPeer(
peer_id_a,
NetworkAddress::mock(),
ConnectionOrigin::Inbound,
ConnectionMetadata::mock(peer_id_a),
NetworkContext::mock_with_peer_id(peer_id_a),
DisconnectReason::Requested,
);
assert_eq!(receiver.select_next_some().await, notif,);
Expand Down
31 changes: 10 additions & 21 deletions network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,12 @@ pub enum ConnectionRequest {
),
}

#[derive(Clone, PartialEq, Eq, Serialize)]
#[derive(Clone, PartialEq, Serialize)]
pub enum ConnectionNotification {
/// Connection with a new peer has been established.
NewPeer(
PeerId,
NetworkAddress,
ConnectionOrigin,
Arc<NetworkContext>,
),
NewPeer(ConnectionMetadata, Arc<NetworkContext>),
/// Connection to a peer has been terminated. This could have been triggered from either end.
LostPeer(PeerId, NetworkAddress, ConnectionOrigin, DisconnectReason),
LostPeer(ConnectionMetadata, Arc<NetworkContext>, DisconnectReason),
}

impl std::fmt::Debug for ConnectionNotification {
Expand All @@ -110,11 +105,11 @@ impl std::fmt::Debug for ConnectionNotification {
impl std::fmt::Display for ConnectionNotification {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ConnectionNotification::NewPeer(peer, addr, origin, context) => {
write!(f, "[{},{},{},{}]", peer, addr, origin, context)
ConnectionNotification::NewPeer(metadata, context) => {
write!(f, "[{},{}]", metadata, context)
}
ConnectionNotification::LostPeer(peer, addr, origin, reason) => {
write!(f, "[{},{},{},{}]", peer, addr, origin, reason)
ConnectionNotification::LostPeer(metadata, context, reason) => {
write!(f, "[{},{},{}]", metadata, context, reason)
}
}
}
Expand Down Expand Up @@ -526,9 +521,8 @@ where
// but does not affect correctness.
if !self.active_peers.contains_key(&peer_id) {
let notif = ConnectionNotification::LostPeer(
peer_id,
lost_conn_metadata.addr.clone(),
lost_conn_metadata.origin,
lost_conn_metadata,
self.network_context.clone(),
reason,
);
self.send_conn_notification(peer_id, notif);
Expand Down Expand Up @@ -780,12 +774,7 @@ where
.insert(peer_id, (conn_meta.clone(), network_reqs_tx));
// Send NewPeer notification to connection event handlers.
if send_new_peer_notification {
let notif = ConnectionNotification::NewPeer(
peer_id,
conn_meta.addr.clone(),
conn_meta.origin,
self.network_context.clone(),
);
let notif = ConnectionNotification::NewPeer(conn_meta, self.network_context.clone());
self.send_conn_notification(peer_id, notif);
}
}
Expand Down
7 changes: 2 additions & 5 deletions network/src/peer_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,7 @@ fn test_dial_disconnect() {

// Expect NewPeer notification from PeerManager.
let conn_notif = conn_status_rx.next().await.unwrap();
assert!(matches!(
conn_notif,
ConnectionNotification::NewPeer(_, _, _, _)
));
assert!(matches!(conn_notif, ConnectionNotification::NewPeer(_, _)));

// Send DisconnectPeer request to PeerManager.
let (disconnect_resp_tx, disconnect_resp_rx) = oneshot::channel();
Expand Down Expand Up @@ -621,7 +618,7 @@ fn test_dial_disconnect() {
let conn_notif = conn_status_rx.next().await.unwrap();
assert!(matches!(
conn_notif,
ConnectionNotification::LostPeer(_, _, _, _)
ConnectionNotification::LostPeer(_, _, _)
));

// Sender of disconnect request should receive acknowledgement once connection is closed.
Expand Down
8 changes: 2 additions & 6 deletions network/src/protocols/health_checker/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ use crate::{
network::{NewNetworkEvents, NewNetworkSender},
rpc::InboundRpcRequest,
},
transport::ConnectionMetadata,
ProtocolId,
};
use channel::{diem_channel, message_queues::QueueStyle};
use diem_config::{config::RoleType, network_id::NetworkId};
use diem_network_address::NetworkAddress;
use futures::sink::SinkExt;
use netcore::transport::ConnectionOrigin;
use std::str::FromStr;
use tokio::runtime::Runtime;

const PING_TIMEOUT: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -176,9 +174,7 @@ async fn send_new_peer_notification(
) {
let (delivered_tx, delivered_rx) = oneshot::channel();
let notif = peer_manager::ConnectionNotification::NewPeer(
peer_id,
NetworkAddress::from_str("/ip6/::1/tcp/8081").unwrap(),
ConnectionOrigin::Inbound,
ConnectionMetadata::mock(peer_id),
NetworkContext::mock(),
);
connection_notifs_tx
Expand Down
8 changes: 4 additions & 4 deletions network/src/protocols/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ fn peer_mgr_notif_to_event<TMessage: Message>(

fn control_msg_to_event<TMessage>(notif: ConnectionNotification) -> Event<TMessage> {
match notif {
ConnectionNotification::NewPeer(peer_id, _addr, origin, _context) => {
Event::NewPeer(peer_id, origin)
ConnectionNotification::NewPeer(metadata, _context) => {
Event::NewPeer(metadata.remote_peer_id, metadata.origin)
}
ConnectionNotification::LostPeer(peer_id, _addr, origin, _reason) => {
Event::LostPeer(peer_id, origin)
ConnectionNotification::LostPeer(metadata, _context, _reason) => {
Event::LostPeer(metadata.remote_peer_id, metadata.origin)
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions state-synchronizer/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use network::{
direct_send::Message,
network::{NewNetworkEvents, NewNetworkSender},
},
transport::ConnectionMetadata,
DisconnectReason, ProtocolId,
};
use network_builder::builder::NetworkBuilder;
Expand Down Expand Up @@ -427,18 +428,15 @@ impl SynchronizerEnv {
direction: ConnectionOrigin,
) {
let sender_id = self.get_peer_network_id(sender);
let mut metadata = ConnectionMetadata::mock(sender_id);
metadata.origin = direction;

let notif = if new_peer {
ConnectionNotification::NewPeer(
sender_id,
NetworkAddress::mock(),
direction,
NetworkContext::mock(),
)
ConnectionNotification::NewPeer(metadata, NetworkContext::mock())
} else {
ConnectionNotification::LostPeer(
sender_id,
NetworkAddress::mock(),
direction,
metadata,
NetworkContext::mock(),
DisconnectReason::ConnectionLost,
)
};
Expand Down

0 comments on commit dbf71d0

Please sign in to comment.