Skip to content

Commit

Permalink
feat: introduce OnConnect
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Apr 26, 2023
1 parent 3a76970 commit 1077af2
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 66 deletions.
7 changes: 5 additions & 2 deletions node/router/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
use crate::{Heartbeat, Inbound, Outbound};
use snarkos_node_messages::Message;
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake},
protocols::{Disconnect, Handshake, OnConnect},
P2P,
};
use snarkvm::prelude::Network;

use core::time::Duration;

#[async_trait]
pub trait Routing<N: Network>: P2P + Disconnect + Handshake + Inbound<N> + Outbound<N> + Heartbeat<N> {
pub trait Routing<N: Network>:
P2P + Disconnect + OnConnect + Handshake + Inbound<N> + Outbound<N> + Heartbeat<N>
{
/// Initialize the routing.
async fn initialize_routing(&self) {
// Enable the TCP protocols.
self.enable_handshake().await;
self.enable_reading().await;
self.enable_writing().await;
self.enable_disconnect().await;
self.enable_on_connect().await;
// Enable the TCP listener. Note: This must be called after the above protocols.
self.enable_listener().await;
// Initialize the heartbeat.
Expand Down
18 changes: 9 additions & 9 deletions node/router/tests/common/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use snarkos_node_messages::{
DisconnectReason,
Message,
MessageCodec,
Ping,
Pong,
UnconfirmedSolution,
UnconfirmedTransaction,
};
use snarkos_node_router::{Heartbeat, Inbound, Outbound, Router, Routing};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, Reading, Writing},
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
Connection,
ConnectionSide,
Tcp,
Expand All @@ -36,7 +35,6 @@ use snarkos_node_tcp::{
use snarkvm::prelude::{Block, EpochChallenge, Header, Network, ProverSolution, Transaction};

use async_trait::async_trait;
use futures_util::sink::SinkExt;
use std::{io, net::SocketAddr};
use tracing::*;

Expand Down Expand Up @@ -73,17 +71,19 @@ impl<N: Network> Handshake for TestRouter<N> {
let conn_side = connection.side();
let stream = self.borrow_stream(&mut connection);
let genesis_header = *sample_genesis_block().header();
let (peer_ip, mut framed) = self.router().handshake(peer_addr, stream, conn_side, genesis_header).await?;

// Send the first `Ping` message to the peer.
let message = Message::Ping(Ping::new(self.node_type(), None));
trace!("Sending '{}' to '{peer_ip}'", message.name());
framed.send(message).await?;
self.router().handshake(peer_addr, stream, conn_side, genesis_header).await?;

Ok(connection)
}
}

#[async_trait]
impl<N: Network> OnConnect for TestRouter<N> {
async fn on_connect(&self, _peer_addr: SocketAddr) {
// This behavior is currently not tested.
}
}

#[async_trait]
impl<N: Network> Disconnect for TestRouter<N> {
/// Any extra operations to be performed during a disconnect.
Expand Down
2 changes: 1 addition & 1 deletion node/src/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use snarkos_node_messages::{
use snarkos_node_rest::Rest;
use snarkos_node_router::{Heartbeat, Inbound, Outbound, Router, Routing};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, Reading, Writing},
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::prelude::{
Expand Down
38 changes: 20 additions & 18 deletions node/src/beacon/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,11 @@

use super::*;

use snarkos_node_messages::{
BlockRequest,
BlockResponse,
DataBlocks,
DisconnectReason,
Message,
MessageCodec,
Ping,
Pong,
};
use snarkos_node_messages::{BlockRequest, BlockResponse, DataBlocks, DisconnectReason, Message, MessageCodec, Pong};
use snarkos_node_router::Routing;
use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
use snarkvm::prelude::{error, EpochChallenge, Header};

use futures_util::sink::SinkExt;
use std::{io, net::SocketAddr};

impl<N: Network, C: ConsensusStorage<N>> P2P for Beacon<N, C> {
Expand All @@ -49,23 +39,35 @@ impl<N: Network, C: ConsensusStorage<N>> Handshake for Beacon<N, C> {
let conn_side = connection.side();
let stream = self.borrow_stream(&mut connection);
let genesis_header = self.ledger.get_header(0).map_err(|e| error(format!("{e}")))?;
let (peer_ip, mut framed) = self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;
self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

Ok(connection)
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> OnConnect for Beacon<N, C>
where
Self: Outbound<N>,
{
async fn on_connect(&self, peer_addr: SocketAddr) {
let peer_ip = if let Some(ip) = self.router.resolve_to_listener(&peer_addr) {
ip
} else {
return;
};

// Retrieve the block locators.
let block_locators = match crate::helpers::get_block_locators(&self.ledger) {
Ok(block_locators) => Some(block_locators),
Err(e) => {
error!("Failed to get block locators: {e}");
return Err(error(format!("Failed to get block locators: {e}")));
return;
}
};

// Send the first `Ping` message to the peer.
let message = Message::Ping(Ping::new(self.node_type(), block_locators));
trace!("Sending '{}' to '{peer_ip}'", message.name());
framed.send(message).await?;

Ok(connection)
self.send_ping(peer_ip, block_locators);
}
}

Expand Down
2 changes: 1 addition & 1 deletion node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use snarkos_account::Account;
use snarkos_node_messages::{Message, NodeType, UnconfirmedSolution};
use snarkos_node_router::{Heartbeat, Inbound, Outbound, Router, Routing};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, Reading, Writing},
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::prelude::{Block, CoinbasePuzzle, ConsensusStorage, EpochChallenge, Header, Network, ProverSolution};
Expand Down
27 changes: 19 additions & 8 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

use super::*;

use snarkos_node_messages::{BlockRequest, DisconnectReason, MessageCodec, Ping, Pong, UnconfirmedTransaction};
use snarkos_node_messages::{BlockRequest, DisconnectReason, MessageCodec, Pong, UnconfirmedTransaction};
use snarkos_node_router::Routing;
use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
use snarkvm::prelude::{Network, Transaction};

use futures_util::sink::SinkExt;
use std::{io, net::SocketAddr, time::Duration};

impl<N: Network, C: ConsensusStorage<N>> P2P for Client<N, C> {
Expand All @@ -40,17 +39,29 @@ impl<N: Network, C: ConsensusStorage<N>> Handshake for Client<N, C> {
let conn_side = connection.side();
let stream = self.borrow_stream(&mut connection);
let genesis_header = *self.genesis.header();
let (peer_ip, mut framed) = self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

// Send the first `Ping` message to the peer.
let message = Message::Ping(Ping::new(self.node_type(), None));
trace!("Sending '{}' to '{peer_ip}'", message.name());
framed.send(message).await?;
self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

Ok(connection)
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> OnConnect for Client<N, C>
where
Self: Outbound<N>,
{
async fn on_connect(&self, peer_addr: SocketAddr) {
let peer_ip = if let Some(ip) = self.router.resolve_to_listener(&peer_addr) {
ip
} else {
return;
};

// Send the first `Ping` message to the peer.
self.send_ping(peer_ip, None);
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> Disconnect for Client<N, C> {
/// Any extra operations to be performed during a disconnect.
Expand Down
2 changes: 1 addition & 1 deletion node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use snarkos_account::Account;
use snarkos_node_messages::{Data, Message, NodeType, UnconfirmedSolution};
use snarkos_node_router::{Heartbeat, Inbound, Outbound, Router, Routing};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, Reading, Writing},
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::prelude::{Block, CoinbasePuzzle, ConsensusStorage, EpochChallenge, Header, Network, ProverSolution};
Expand Down
35 changes: 19 additions & 16 deletions node/src/prover/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@

use super::*;

use snarkos_node_messages::{
BlockRequest,
DisconnectReason,
Message,
MessageCodec,
Ping,
Pong,
UnconfirmedTransaction,
};
use snarkos_node_messages::{BlockRequest, DisconnectReason, Message, MessageCodec, Pong, UnconfirmedTransaction};
use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
use snarkvm::prelude::{Network, Transaction};

use futures_util::sink::SinkExt;
use std::{io, net::SocketAddr};

impl<N: Network, C: ConsensusStorage<N>> P2P for Prover<N, C> {
Expand All @@ -47,17 +38,29 @@ impl<N: Network, C: ConsensusStorage<N>> Handshake for Prover<N, C> {
let conn_side = connection.side();
let stream = self.borrow_stream(&mut connection);
let genesis_header = *self.genesis.header();
let (peer_ip, mut framed) = self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

// Send the first `Ping` message to the peer.
let message = Message::Ping(Ping::new(self.node_type(), None));
trace!("Sending '{}' to '{peer_ip}'", message.name());
framed.send(message).await?;
self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

Ok(connection)
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> OnConnect for Prover<N, C>
where
Self: Outbound<N>,
{
async fn on_connect(&self, peer_addr: SocketAddr) {
let peer_ip = if let Some(ip) = self.router.resolve_to_listener(&peer_addr) {
ip
} else {
return;
};

// Send the first `Ping` message to the peer.
self.send_ping(peer_ip, None);
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> Disconnect for Prover<N, C> {
/// Any extra operations to be performed during a disconnect.
Expand Down
2 changes: 1 addition & 1 deletion node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use snarkos_node_messages::{BlockRequest, Message, NodeType, PuzzleResponse, Unc
use snarkos_node_rest::Rest;
use snarkos_node_router::{Heartbeat, Inbound, Outbound, Router, Routing};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, Reading, Writing},
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::prelude::{Block, ConsensusStorage, Header, Network, ProverSolution};
Expand Down
28 changes: 19 additions & 9 deletions node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ use snarkos_node_messages::{
DisconnectReason,
Message,
MessageCodec,
Ping,
Pong,
UnconfirmedTransaction,
};
use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
use snarkvm::prelude::{error, EpochChallenge, Network, Transaction};

use futures_util::sink::SinkExt;
use std::{io, net::SocketAddr, time::Duration};

impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> {
Expand All @@ -50,23 +48,35 @@ impl<N: Network, C: ConsensusStorage<N>> Handshake for Validator<N, C> {
let conn_side = connection.side();
let stream = self.borrow_stream(&mut connection);
let genesis_header = self.ledger.get_header(0).map_err(|e| error(format!("{e}")))?;
let (peer_ip, mut framed) = self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;
self.router.handshake(peer_addr, stream, conn_side, genesis_header).await?;

Ok(connection)
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> OnConnect for Validator<N, C>
where
Self: Outbound<N>,
{
async fn on_connect(&self, peer_addr: SocketAddr) {
let peer_ip = if let Some(ip) = self.router.resolve_to_listener(&peer_addr) {
ip
} else {
return;
};

// Retrieve the block locators.
let block_locators = match crate::helpers::get_block_locators(&self.ledger) {
Ok(block_locators) => Some(block_locators),
Err(e) => {
error!("Failed to get block locators: {e}");
return Err(error(format!("Failed to get block locators: {e}")));
return;
}
};

// Send the first `Ping` message to the peer.
let message = Message::Ping(Ping::new(self.node_type(), block_locators));
trace!("Sending '{}' to '{peer_ip}'", message.name());
framed.send(message).await?;

Ok(connection)
self.send_ping(peer_ip, block_locators);
}
}

Expand Down
3 changes: 3 additions & 0 deletions node/tcp/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use crate::connections::Connection;

mod disconnect;
mod handshake;
mod on_connect;
mod reading;
mod writing;

pub use disconnect::Disconnect;
pub use handshake::Handshake;
pub use on_connect::OnConnect;
pub use reading::Reading;
pub use writing::Writing;

Expand All @@ -40,6 +42,7 @@ pub(crate) struct Protocols {
pub(crate) handshake: OnceBox<ProtocolHandler<Connection, io::Result<Connection>>>,
pub(crate) reading: OnceBox<ProtocolHandler<Connection, io::Result<Connection>>>,
pub(crate) writing: OnceBox<writing::WritingHandler>,
pub(crate) on_connect: OnceBox<ProtocolHandler<SocketAddr, ()>>,
pub(crate) disconnect: OnceBox<ProtocolHandler<SocketAddr, ()>>,
}

Expand Down
Loading

0 comments on commit 1077af2

Please sign in to comment.