Skip to content

Commit

Permalink
Modify the network into new interface
Browse files Browse the repository at this point in the history
Old:
ReceiveOperations

New:
Operations: receive/send asked operation
OperationBatch: receive/send operations ids (batch)
AskOperation: receive/send require operations by ids
  • Loading branch information
adrien-zinger committed Mar 18, 2022
1 parent f127d6d commit 038cb5f
Show file tree
Hide file tree
Showing 14 changed files with 663 additions and 339 deletions.
13 changes: 12 additions & 1 deletion massa-models/src/operation.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright (c) 2022 MASSA LABS <[email protected]>

use crate::constants::{ADDRESS_SIZE_BYTES, OPERATION_ID_SIZE_BYTES};
use crate::prehash::{PreHashed, Set};
use crate::signed::{Id, Signable, Signed};
use crate::{
constants::{ADDRESS_SIZE_BYTES, OPERATION_ID_SIZE_BYTES},
node::NodeId,
};
use crate::{
serialization::{
array_from_slice, DeserializeCompact, DeserializeVarInt, SerializeCompact, SerializeVarInt,
Expand Down Expand Up @@ -478,6 +481,14 @@ impl Operation {
}
}

/// Data structure forwarded in the network after asking [Operation].
/// Option is None if the asked node hasn't the operation.
pub type AskedOperations = std::collections::HashMap<OperationId, Option<SignedOperation>>;
/// Internal data structure describing the [Operation] we do want from which `NodeId`.
pub type WantOperations = std::collections::HashMap<NodeId, Vec<OperationId>>;
/// Same as wanted operation but used to propagate `OperationId` through `NodeId`
pub type OperationBatches = std::collections::HashMap<NodeId, Vec<OperationId>>;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
194 changes: 178 additions & 16 deletions massa-network-exports/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,170 @@
use crate::{BootstrapPeers, Peers};
//! Declaration of the public and internal Events and Commands used by
//! `massa-network` that allow external communication with other nodes.
//!
//! # Operations workflow
//! An operation batch can be send or received by the massa node. Which
//! operation is asked is managed by the `NetworkWorker`.
//!
//! Other modules has the access to all commands but the usage if they want to
//! send operation that they just noticed, they should use the command
//! [NetworkCommand::SendOperationBatch].
//!
//! ```txt
//! OurNode NetworkWorker NodeWorker
//! | | |
//! +------------------------------------------- Creation of some operations
//! # | | Extends the pool ect...
//! # | |
//! +-------------->| | Use NetworkCommand::SendOperationBatch(Vec<OperationId>)
//! | +--------------->| Use NodeCommand::SendOperationBatch(Vec<OperationId>)
//! | | #
//! | | # Propagate the batch through the network
//! ```
//!
//! When receiving an operation batch from the network, the `NodeWorker` will
//! inform the `NetworkWorker` that some potentials new operations are in
//! transit and can be requested.
//!
//! The event used for receiving informations from the newtork are
//! `ReceivedOperations`, `ReceivedAskForBlocks` and `ReceivedOperationBatch`.
//!
//! The network will inform the node that new operations can transites with
//! the propagation method `SendOperationBatch` that we defined just before.
//! Then, the node will manage if he ask or not the operations inside the
//! `NetworkWorker` on node-worker event `ReceivedOperationBatch`
//!
//! ```txt
//! Asking for operations
//! ---
//!
//! NodeWorker NetworkWorker ProtocolWorker
//! | | |
//! +------------------------------------------------------- Receive a batch (NodeEvent...ReceivedOperationBatch)
//! . | |
//! +-------------->| | NetworkWorker react on previous event.
//! | +------------------------># - Check in the protocol if we already have operations
//! | | # or not. Build the vector of requirement.
//! | | # - Update the `NodeInfo` of the sender.
//! | | #
//! | |<------------------------+ `NetworkCommand::AskForOperations(WantedOperations)`
//! |<--------------+ | `NodeCommand::AskForOperations(Vec<OperationId>)`
//! | | |
//! | | | > Use the `WantOperations` structure.
//! | | | > Ask to the node that sent the batch a list of operations
//! | | | > that we don't already know with `NodeCommand::AskForOperations`.
//! | | |
//! +-------------->| | `NodeEvent::ReceivedOperations`
//! | +------------------------># `NetworkEvent::ReceivedOperations`
//! | | #
//! | | # > Receive the full operations inside the structure
//! | | # > `AskedOperation`.
//! | | #
//! | | # Update local state and the `NodeInfo` of the sender if required.
//! | | |
//! | |<------------------------+ `NetworkCommand::SendOperationBatch`
//! |<--------------+ | `NodeCommand::SendOperationBatch`
//! | | |
//! | | | > Propagate the batch through the network (send to nodes
//! | | | > that don't already know the local operations (we suppose that
//! | | | > from previous discussions with distant node)
//! ```
//!
//! See also: [WantOperations], [AskedOperations]
//! Look at `massa-protocol-worker/src/node-info.rs` to look further how we
//! remember wich node know what.
//!
//! On receive the command from the network
//! `NodeEvent(..ReceivedAskForOperations)` the `NetworkWorker` will build a
//! wanted operation structure with None or Some depending the node has it.
//!
//! ```txt
//! NodeWorker NetworkWorker ProtocolWorker
//! | | |
//! +------------------------------------------------------ Receive an "ReceivedAskForOperations" event
//! . | |
//! +-------------->| | `NodeEvent::ReceivedAskForOperations`
//! | +------------------------># `NetworkEvent::ReceivedAskForOperations`
//! | | #
//! | | # > Build a `WantedOperation` structure for the required
//! | | # > operation
//! | | # > Update the `NodeInfo` of the sender
//! | | #
//! | |<------------------------+ `NetworkCommand::SendOperations`
//! |<--------------+ | `NodeCommand::SendOperations`
//! | | |
//! ```
use crate::{BootstrapPeers, ConnectionClosureReason, Peers};
use massa_models::{
composite::PubkeySig, node::NodeId, stats::NetworkStats, Block, BlockId, OperationId,
SignedEndorsement, SignedHeader, SignedOperation,
composite::PubkeySig,
node::NodeId,
operation::{AskedOperations, OperationBatches, WantOperations},
stats::NetworkStats,
Block, BlockId, OperationId, SignedEndorsement, SignedHeader,
};
use std::{collections::HashMap, net::IpAddr};
use tokio::sync::oneshot;

#[derive(Clone, Debug)]
pub enum NodeCommand {
/// Send given peer list to node.
SendPeerList(Vec<IpAddr>),
/// Send that block to node.
SendBlock(Block),
/// Send the header of a block to a node.
SendBlockHeader(SignedHeader),
/// Ask for a block from that node.
AskForBlocks(Vec<BlockId>),
/// Close the node worker.
Close(ConnectionClosureReason),
/// Block not found
BlockNotFound(BlockId),
/// Send full Operations (send to a node that previously asked for)
SendOperations(AskedOperations),
/// Send a batch of operation ids
SendOperationBatch(Vec<OperationId>),
/// Ask for a set of operations
AskForOperations(Vec<OperationId>),
/// Endorsements
SendEndorsements(Vec<SignedEndorsement>),
}

/// Event types that node worker can emit
/// Append on receive something from inside and outside.
/// Outside init with `Received` prefix.
#[derive(Clone, Debug)]
pub enum NodeEventType {
/// Node we are connected to asked for advertised peers
AskedPeerList,
/// Node we are connected to sent peer list
ReceivedPeerList(Vec<IpAddr>),
/// Node we are connected to sent block
ReceivedBlock(Block),
/// Node we are connected to sent block header
ReceivedBlockHeader(SignedHeader),
/// Node we are connected to asks for a block.
ReceivedAskForBlocks(Vec<BlockId>),
/// Didn't found given block,
BlockNotFound(BlockId),
/// Operation
ReceivedOperations(AskedOperations),
/// Received operation batch
ReceivedOperationBatch(Vec<OperationId>),
/// Receive a list of wanted operations
ReceivedAskForOperations(Vec<OperationId>),
/// Receive a set of endorsement
ReceivedEndorsements(Vec<SignedEndorsement>),
}

/// Events node worker can emit.
/// Events are a tuple linking a node id to an event type
#[derive(Clone, Debug)]
pub struct NodeEvent(pub NodeId, pub NodeEventType);

/// Commands that the worker can execute
#[derive(Debug)]
pub enum NetworkCommand {
/// Ask for a block from a node.
/// Ask for a block to a node.
AskForBlocks {
list: HashMap<NodeId, Vec<BlockId>>,
},
Expand All @@ -33,11 +188,6 @@ pub enum NetworkCommand {
node: NodeId,
block_id: BlockId,
},
/// Require to the network to send a list of operation
SendOperations {
node: NodeId,
operations: HashMap<OperationId, Option<SignedOperation>>,
},
SendEndorsements {
node: NodeId,
endorsements: Vec<SignedEndorsement>,
Expand All @@ -49,6 +199,19 @@ pub enum NetworkCommand {
GetStats {
response_tx: oneshot::Sender<NetworkStats>,
},
/// Require to the network to send a list of full operations
SendOperations {
node: NodeId,
operations: AskedOperations,
},
/// Receive previously asked Operation
SendOperationBatch {
batches: OperationBatches,
},
/// Ask for operation
AskForOperations {
wishlist: WantOperations,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -78,17 +241,16 @@ pub enum NetworkEvent {
/// Receive previously asked Operation
ReceivedOperations {
node: NodeId,
operations: HashMap<OperationId, Option<SignedOperation>>,
operations: AskedOperations,
},
/// Receive a batch of operation ids by someone
OperationsBatch {
ReceivedOperationBatch {
node: NodeId,
operations_id: Vec<OperationId>,
operation_ids: Vec<OperationId>,
},
/// Someone ask for operations.
AskedForOperations {
/// Receive a list of asked operations from `node`
ReceiveAskForOperations {
node: NodeId,
list: Vec<OperationId>,
operation_ids: Vec<OperationId>,
},
ReceivedEndorsements {
node: NodeId,
Expand Down
4 changes: 3 additions & 1 deletion massa-network-exports/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Manages a connection with a node
pub use commands::{NetworkCommand, NetworkEvent, NetworkManagementCommand};
pub use commands::{
NetworkCommand, NetworkEvent, NetworkManagementCommand, NodeCommand, NodeEvent, NodeEventType,
};
pub use common::{ConnectionClosureReason, ConnectionId};
pub use error::{HandshakeErrorType, NetworkConnectionErrorType, NetworkError};
pub use establisher::{Establisher, Listener, ReadHalf, WriteHalf};
Expand Down
35 changes: 32 additions & 3 deletions massa-network-exports/src/network_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use crate::{
NetworkEvent, Peers,
};
use massa_models::{
composite::PubkeySig, node::NodeId, stats::NetworkStats, Block, BlockId, OperationId,
SignedEndorsement, SignedHeader, SignedOperation,
composite::PubkeySig,
node::NodeId,
operation::{AskedOperations, OperationBatches, WantOperations},
stats::NetworkStats,
Block, BlockId, SignedEndorsement, SignedHeader,
};
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -139,7 +142,7 @@ impl NetworkCommandSender {
pub async fn send_operations(
&self,
node: NodeId,
operations: HashMap<OperationId, Option<SignedOperation>>,
operations: AskedOperations,
) -> Result<(), NetworkError> {
self.0
.send(NetworkCommand::SendOperations { node, operations })
Expand All @@ -150,6 +153,32 @@ impl NetworkCommandSender {
Ok(())
}

pub async fn send_operations_batches(
&self,
batches: OperationBatches,
) -> Result<(), NetworkError> {
self.0
.send(NetworkCommand::SendOperationBatch { batches })
.await
.map_err(|_| {
NetworkError::ChannelError("could not send SendOperationBatch command".into())
})?;
Ok(())
}

pub async fn send_ask_for_operations(
&self,
wishlist: WantOperations,
) -> Result<(), NetworkError> {
self.0
.send(NetworkCommand::AskForOperations { wishlist })
.await
.map_err(|_| {
NetworkError::ChannelError("could not send AskForOperations command".into())
})?;
Ok(())
}

pub async fn send_endorsements(
&self,
node: NodeId,
Expand Down
Loading

0 comments on commit 038cb5f

Please sign in to comment.