Skip to content

Commit

Permalink
storage: move store to after protocol check
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Mar 21, 2022
1 parent 4e53e5e commit 351fde1
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 216 deletions.
4 changes: 2 additions & 2 deletions massa-consensus-worker/src/tests/mock_protocol_controller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) 2022 MASSA LABS <[email protected]>

use massa_models::{
constants::CHANNEL_SIZE, signed::Signable, storage::Storage, Block, BlockHeader, BlockId,
SerializeCompact, SignedHeader,
constants::CHANNEL_SIZE, signed::Signable, storage::Storage, Block, BlockId, SerializeCompact,
SignedHeader,
};
use massa_protocol_exports::{
ProtocolCommand, ProtocolCommandSender, ProtocolEvent, ProtocolEventReceiver,
Expand Down
5 changes: 3 additions & 2 deletions massa-network-exports/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{BootstrapPeers, Peers};
use massa_models::SignedEndorsement;
use massa_models::SignedHeader;
use massa_models::SignedOperation;
use massa_models::{composite::PubkeySig, node::NodeId, stats::NetworkStats, BlockId};
use massa_models::{composite::PubkeySig, node::NodeId, stats::NetworkStats, Block, BlockId};
use tokio::sync::oneshot;

/// Commands that the worker can execute
Expand Down Expand Up @@ -58,7 +58,8 @@ pub enum NetworkEvent {
/// A block was received
ReceivedBlock {
node: NodeId,
block_id: BlockId,
block: Block,
serialized: Vec<u8>,
},
/// A block header was received
ReceivedBlockHeader {
Expand Down
12 changes: 8 additions & 4 deletions massa-network-worker/src/network_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl EventSender {
pub mod event_impl {
use crate::{network_worker::NetworkWorker, node_worker::NodeCommand};
use massa_logging::massa_trace;
use massa_models::{node::NodeId, BlockId, SignedEndorsement, SignedHeader, SignedOperation};
use massa_models::{
node::NodeId, Block, BlockId, SignedEndorsement, SignedHeader, SignedOperation,
};
use massa_network_exports::{NetworkError, NetworkEvent};
use std::net::IpAddr;
use tracing::{debug, info};
Expand Down Expand Up @@ -108,17 +110,19 @@ pub mod event_impl {
pub async fn on_received_block(
worker: &mut NetworkWorker,
from: NodeId,
block_id: BlockId,
block: Block,
serialized: Vec<u8>,
) -> Result<(), NetworkError> {
massa_trace!(
"network_worker.on_node_event receive NetworkEvent::ReceivedBlock",
{"block_id": block_id, "node": from}
{"block": block, "node": from}
);
if let Err(err) = worker
.event
.send(NetworkEvent::ReceivedBlock {
node: from,
block_id,
block,
serialized,
})
.await
{
Expand Down
4 changes: 2 additions & 2 deletions massa-network-worker/src/network_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ impl NetworkWorker {
NodeEvent(from_node_id, NodeEventType::ReceivedPeerList(lst)) => {
event_impl::on_received_peer_list(self, from_node_id, &lst)?
}
NodeEvent(from_node_id, NodeEventType::ReceivedBlock(data)) => {
event_impl::on_received_block(self, from_node_id, data).await?
NodeEvent(from_node_id, NodeEventType::ReceivedBlock(block, serialized)) => {
event_impl::on_received_block(self, from_node_id, block, serialized).await?
}
NodeEvent(from_node_id, NodeEventType::ReceivedAskForBlocks(list)) => {
event_impl::on_received_ask_for_blocks(self, from_node_id, list).await
Expand Down
10 changes: 3 additions & 7 deletions massa-network-worker/src/node_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use massa_models::{
},
node::NodeId,
signed::Signable,
BlockId, SignedEndorsement, SignedHeader, SignedOperation,
Block, BlockId, SignedEndorsement, SignedHeader, SignedOperation,
};
use massa_models::{SerializeCompact, SerializeVarInt};
use massa_network_exports::{ConnectionClosureReason, NetworkError, NetworkSettings};
Expand Down Expand Up @@ -66,7 +66,7 @@ pub enum NodeEventType {
/// Node we are connected to sent peer list
ReceivedPeerList(Vec<IpAddr>),
/// Node we are connected to sent block
ReceivedBlock(BlockId),
ReceivedBlock(Block, Vec<u8>),
/// Node we are connected to sent block header
ReceivedBlockHeader(SignedHeader),
/// Node we are connected to asks for a block.
Expand Down Expand Up @@ -309,11 +309,7 @@ impl NodeWorker {
"node_worker.run_loop. receive Message::Block",
{"block_id": block.header.content.compute_id()?, "block": block, "node": self.node_id}
);

// TODO: avoid computing id.
let block_id = block.header.content.compute_id()?;
self.storage.store_block(block_id, block.clone(), serialized.unwrap());
self.send_node_event(NodeEvent(self.node_id, NodeEventType::ReceivedBlock(block_id))).await;
self.send_node_event(NodeEvent(self.node_id, NodeEventType::ReceivedBlock(block, serialized.expect("Block should come wiht its serialized form.")))).await;
},
Message::BlockHeader(header) => {
massa_trace!(
Expand Down
18 changes: 4 additions & 14 deletions massa-protocol-exports/src/tests/mock_network_controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
// Copyright (c) 2022 MASSA LABS <[email protected]>

use massa_models::SerializeCompact;
use massa_models::{constants::CHANNEL_SIZE, node::NodeId};
use massa_models::{
storage::Storage, Block, BlockId, SignedEndorsement, SignedHeader, SignedOperation,
};
use massa_models::{Block, BlockId, SignedEndorsement, SignedHeader, SignedOperation};
use massa_network_exports::{
NetworkCommand, NetworkCommandSender, NetworkEvent, NetworkEventReceiver,
};
Expand Down Expand Up @@ -72,19 +69,12 @@ impl MockNetworkController {
.expect("Couldn't send header to protocol.");
}

pub async fn send_block(
&mut self,
source_node_id: NodeId,
block_id: BlockId,
block_and_storage: Option<(Block, Storage)>,
) {
if let Some((block, storage)) = block_and_storage {
storage.store_block(block_id, block.clone(), block.to_bytes_compact().unwrap());
}
pub async fn send_block(&mut self, source_node_id: NodeId, block: Block, serialized: Vec<u8>) {
self.network_event_tx
.send(NetworkEvent::ReceivedBlock {
node: source_node_id,
block_id,
block,
serialized,
})
.await
.expect("Couldn't send block to protocol.");
Expand Down
8 changes: 4 additions & 4 deletions massa-protocol-exports/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::{
use massa_hash::hash::Hash;
use massa_models::node::NodeId;
use massa_models::signed::{Signable, Signed};
use massa_models::SerializeCompact;
use massa_models::{
storage::Storage, Address, Amount, Block, BlockHeader, BlockId, SignedEndorsement,
SignedOperation, Slot,
Address, Amount, Block, BlockHeader, BlockId, SignedEndorsement, SignedOperation, Slot,
};
use massa_models::{Endorsement, Operation, OperationType};
use massa_network_exports::NetworkCommand;
Expand Down Expand Up @@ -132,13 +132,13 @@ pub async fn send_and_propagate_block(
valid: bool,
source_node_id: NodeId,
protocol_event_receiver: &mut ProtocolEventReceiver,
storage: Option<Storage>,
) {
let expected_hash = block.header.content.compute_id().unwrap();
let serialized = block.to_bytes_compact().unwrap();

// Send block to protocol.
network_controller
.send_block(source_node_id, expected_hash, storage.map(|s| (block, s)))
.send_block(source_node_id, block, serialized)
.await;

// Check protocol sends block to consensus.
Expand Down
29 changes: 15 additions & 14 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use massa_models::{
node::NodeId,
prehash::{BuildMap, Map, Set},
signed::Signable,
Address, BlockId, EndorsementId, OperationId, OperationType, SignedEndorsement, SignedHeader,
SignedOperation,
Address, Block, BlockId, EndorsementId, OperationId, OperationType, SignedEndorsement,
SignedHeader, SignedOperation,
};
use massa_network_exports::{NetworkCommandSender, NetworkEvent, NetworkEventReceiver};
use massa_protocol_exports::{
Expand Down Expand Up @@ -1125,7 +1125,7 @@ impl ProtocolWorker {
/// - Check root hash.
async fn note_block_from_node(
&mut self,
block: &BlockId,
block: &Block,
source_node_id: &NodeId,
) -> Result<
Option<(
Expand All @@ -1138,13 +1138,11 @@ impl ProtocolWorker {
massa_trace!("protocol.protocol_worker.note_block_from_node", { "node": source_node_id, "block": block });

let (header, operations, operation_merkle_root, slot) = {
let stored_block = self.storage.retrieve_block(block).unwrap();
let stored_block = stored_block.read();
(
stored_block.block.header.clone(),
stored_block.block.operations.clone(),
stored_block.block.header.content.operation_merkle_root,
stored_block.block.header.content.slot,
block.header.clone(),
block.operations.clone(),
block.header.content.operation_merkle_root,
block.header.content.slot,
)
};

Expand Down Expand Up @@ -1390,14 +1388,17 @@ impl ProtocolWorker {
}
NetworkEvent::ReceivedBlock {
node: from_node_id,
block_id,
block,
serialized,
} => {
massa_trace!("protocol.protocol_worker.on_network_event.received_block", { "node": from_node_id, "block_id": block_id});

// TODO: remove clone of block.
massa_trace!("protocol.protocol_worker.on_network_event.received_block", { "node": from_node_id, "block": block});
if let Some((block_id, operation_set, endorsement_ids)) =
self.note_block_from_node(&block_id, &from_node_id).await?
self.note_block_from_node(&block, &from_node_id).await?
{
// Store block in shared storage.
self.storage
.store_block(block_id, block.clone(), serialized);

let mut set = Set::<BlockId>::with_capacity_and_hasher(1, BuildMap::default());
set.insert(block_id);
self.stop_asking_blocks(set)?;
Expand Down
16 changes: 7 additions & 9 deletions massa-protocol-worker/src/tests/ask_block_scenarios.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022 MASSA LABS <[email protected]>

use super::tools::{protocol_test, protocol_test_with_storage};
use super::tools::protocol_test;
use massa_models::prehash::Set;
use massa_models::signed::Signable;
use massa_models::BlockId;
Expand All @@ -16,14 +16,13 @@ async fn test_without_a_priori() {
// start
let protocol_settings = &tools::PROTOCOL_SETTINGS;

protocol_test_with_storage(
protocol_test(
protocol_settings,
async move |mut network_controller,
protocol_event_receiver,
mut protocol_command_sender,
protocol_manager,
protocol_pool_event_receiver,
storage| {
protocol_pool_event_receiver| {
let node_a = tools::create_and_connect_nodes(1, &mut network_controller)
.await
.pop()
Expand Down Expand Up @@ -57,7 +56,7 @@ async fn test_without_a_priori() {

// node B replied with the block
network_controller
.send_block(node_b.id, hash_1, Some((block, storage)))
.send_block(node_b.id, block, Default::default())
.await;

// 7. Make sure protocol did not send additional ask for block commands.
Expand Down Expand Up @@ -91,14 +90,13 @@ async fn test_without_a_priori() {
async fn test_someone_knows_it() {
// start
let protocol_settings = &tools::PROTOCOL_SETTINGS;
protocol_test_with_storage(
protocol_test(
protocol_settings,
async move |mut network_controller,
mut protocol_event_receiver,
mut protocol_command_sender,
protocol_manager,
protocol_pool_event_receiver,
storage| {
protocol_pool_event_receiver| {
let node_a = tools::create_and_connect_nodes(1, &mut network_controller)
.await
.pop()
Expand Down Expand Up @@ -140,7 +138,7 @@ async fn test_someone_knows_it() {

// node C replied with the block
network_controller
.send_block(node_c.id, hash_1, Some((block, storage)))
.send_block(node_c.id, block, Default::default())
.await;

// 7. Make sure protocol did not send additional ask for block commands.
Expand Down
17 changes: 4 additions & 13 deletions massa-protocol-worker/src/tests/ban_nodes_scenarios.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022 MASSA LABS <[email protected]>

use super::tools::{protocol_test, protocol_test_with_storage};
use super::tools::protocol_test;
use massa_models::prehash::{Map, Set};
use massa_models::signed::Signable;
use massa_models::{BlockId, Slot};
Expand All @@ -15,14 +15,13 @@ use std::time::Duration;
#[serial]
async fn test_protocol_bans_node_sending_block_with_invalid_signature() {
let protocol_settings = &tools::PROTOCOL_SETTINGS;
protocol_test_with_storage(
protocol_test(
protocol_settings,
async move |mut network_controller,
mut protocol_event_receiver,
protocol_command_sender,
protocol_manager,
protocol_pool_event_receiver,
storage| {
protocol_pool_event_receiver| {
// Create 1 node.
let mut nodes = tools::create_and_connect_nodes(1, &mut network_controller).await;

Expand All @@ -36,15 +35,7 @@ async fn test_protocol_bans_node_sending_block_with_invalid_signature() {

// 3. Send block to protocol.
network_controller
.send_block(
creator_node.id,
block
.header
.content
.compute_id()
.expect("Fail to compute block id"),
Some((block, storage)),
)
.send_block(creator_node.id, block, Default::default())
.await;

// The node is banned.
Expand Down
18 changes: 4 additions & 14 deletions massa-protocol-worker/src/tests/cache_scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

// RUST_BACKTRACE=1 cargo test test_one_handshake -- --nocapture --test-threads=1

use super::tools::protocol_test_with_storage;
use massa_models::signed::Signable;
use super::tools::protocol_test;
use massa_models::{self, Address, Slot};
use massa_protocol_exports::tests::tools;
use massa_protocol_exports::ProtocolEvent;
Expand All @@ -26,14 +25,13 @@ lazy_static::lazy_static! {
async fn test_noting_block_does_not_panic_with_zero_max_node_known_blocks_size() {
let protocol_settings = &CUSTOM_PROTOCOL_SETTINGS;

protocol_test_with_storage(
protocol_test(
protocol_settings,
async move |mut network_controller,
mut protocol_event_receiver,
protocol_command_sender,
protocol_manager,
protocol_pool_event_receiver,
storage| {
protocol_pool_event_receiver| {
// Create 1 node.
let nodes = tools::create_and_connect_nodes(1, &mut network_controller).await;

Expand All @@ -54,15 +52,7 @@ async fn test_noting_block_does_not_panic_with_zero_max_node_known_blocks_size()
// and of its header,
// does not panic.
network_controller
.send_block(
nodes[0].id,
block
.header
.content
.compute_id()
.expect("Fail to compute block id"),
Some((block, storage)),
)
.send_block(nodes[0].id, block, Default::default())
.await;

// Wait for the event, should not panic.
Expand Down
Loading

0 comments on commit 351fde1

Please sign in to comment.