Skip to content

Commit

Permalink
Try to fix caches
Browse files Browse the repository at this point in the history
  • Loading branch information
AurelienFT committed Jun 14, 2023
1 parent a1792de commit f8c281d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 15 deletions.
3 changes: 2 additions & 1 deletion massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub(crate) fn start_connectivity_thread(
channel_endorsements.1,
protocol_channels.endorsement_handler_retrieval.0,
protocol_channels.endorsement_handler_retrieval.1,
sender_endorsements_propagation_ext,
sender_endorsements_propagation_ext.clone(),
protocol_channels.endorsement_handler_propagation.1.clone(),
peer_management_handler.sender.command_sender.clone(),
massa_metrics.clone(),
Expand All @@ -166,6 +166,7 @@ pub(crate) fn start_connectivity_thread(
protocol_channels.block_handler_propagation.1.clone(),
sender_blocks_propagation_ext,
sender_operations_propagation_ext,
sender_endorsements_propagation_ext,
peer_management_handler.sender.command_sender.clone(),
config.clone(),
endorsement_cache,
Expand Down
6 changes: 5 additions & 1 deletion massa-protocol-worker/src/handlers/block_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ pub use messages::{
};

use super::{
endorsement_handler::cache::SharedEndorsementCache,
endorsement_handler::{
cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerPropagationCommand,
},
operation_handler::{
cache::SharedOperationCache, commands_propagation::OperationHandlerPropagationCommand,
},
Expand All @@ -56,6 +58,7 @@ impl BlockHandler {
internal_receiver: MassaReceiver<BlockHandlerPropagationCommand>,
internal_sender: MassaSender<BlockHandlerPropagationCommand>,
sender_propagations_ops: MassaSender<OperationHandlerPropagationCommand>,
sender_propagations_endorsements: MassaSender<EndorsementHandlerPropagationCommand>,
peer_cmd_sender: MassaSender<PeerManagementCmd>,
config: ProtocolConfig,
endorsement_cache: SharedEndorsementCache,
Expand All @@ -73,6 +76,7 @@ impl BlockHandler {
receiver_ext,
internal_sender.clone(),
sender_propagations_ops,
sender_propagations_endorsements,
peer_cmd_sender.clone(),
config.clone(),
endorsement_cache,
Expand Down
137 changes: 126 additions & 11 deletions massa-protocol-worker/src/handlers/block_handler/retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{

use crate::{
handlers::{
endorsement_handler::cache::SharedEndorsementCache,
endorsement_handler::{
cache::SharedEndorsementCache,
commands_propagation::EndorsementHandlerPropagationCommand,
},
operation_handler::{
cache::SharedOperationCache, commands_propagation::OperationHandlerPropagationCommand,
},
Expand Down Expand Up @@ -34,6 +37,7 @@ use massa_models::{
operation::{OperationId, SecureShareOperation},
prehash::{CapacityAllocator, PreHashMap, PreHashSet},
secure_share::{Id, SecureShare},
slot::Slot,
timeslots::get_block_slot_timestamp,
};
use massa_pool_exports::PoolController;
Expand Down Expand Up @@ -96,6 +100,7 @@ pub struct RetrievalThread {
asked_blocks: HashMap<PeerId, PreHashMap<BlockId, Instant>>,
peer_cmd_sender: MassaSender<PeerManagementCmd>,
sender_propagation_ops: MassaSender<OperationHandlerPropagationCommand>,
sender_propagation_endorsements: MassaSender<EndorsementHandlerPropagationCommand>,
endorsement_cache: SharedEndorsementCache,
operation_cache: SharedOperationCache,
next_timer_ask_block: Instant,
Expand Down Expand Up @@ -516,7 +521,7 @@ impl RetrievalThread {
.get_or_insert(from_peer_id.clone(), || {
LruMap::new(ByLength::new(
self.config
.max_node_known_blocks_size
.max_node_known_endorsements_size
.try_into()
.expect("max_node_known_blocks_size in config must be > 0"),
))
Expand Down Expand Up @@ -588,7 +593,7 @@ impl RetrievalThread {
.get_or_insert(from_peer_id.clone(), || {
LruMap::new(ByLength::new(
self.config
.max_node_known_blocks_size
.max_node_known_endorsements_size
.try_into()
.expect("max_node_known_blocks_size in config must be > 0"),
))
Expand Down Expand Up @@ -638,7 +643,7 @@ impl RetrievalThread {
endorsements: Vec<SecureShareEndorsement>,
from_peer_id: &PeerId,
) -> Result<(), ProtocolError> {
massa_trace!("protocol.protocol_worker.note_endorsements_from_peer", { "peer": from_peer_id, "endorsements": endorsements});
massa_trace!("protocol.protocol_worker.note_endorsements_from_node", { "node": from_peer_id, "endorsements": endorsements});
let length = endorsements.len();
let mut new_endorsements = PreHashMap::with_capacity(length);
let mut endorsement_ids = PreHashSet::with_capacity(length);
Expand Down Expand Up @@ -672,7 +677,6 @@ impl RetrievalThread {
})
.collect::<Vec<_>>(),
)?;

'write_cache: {
let mut cache_write = self.endorsement_cache.write();
// add to verified signature cache
Expand Down Expand Up @@ -702,6 +706,49 @@ impl RetrievalThread {
if !new_endorsements.is_empty() {
let mut endorsements = self.storage.clone_without_refs();
endorsements.store_endorsements(new_endorsements.into_values().collect());

// Propagate endorsements
// Propagate endorsements when the slot of the block they endorse isn't `max_endorsements_propagation_time` old.
let mut endorsements_to_propagate = endorsements.clone();
let endorsements_to_not_propagate = {
let now = MassaTime::now()?;
let read_endorsements = endorsements_to_propagate.read_endorsements();
endorsements_to_propagate
.get_endorsement_refs()
.iter()
.filter_map(|endorsement_id| {
let slot_endorsed_block =
read_endorsements.get(endorsement_id).unwrap().content.slot;
let slot_timestamp = get_block_slot_timestamp(
self.config.thread_count,
self.config.t0,
self.config.genesis_timestamp,
slot_endorsed_block,
);
match slot_timestamp {
Ok(slot_timestamp) => {
if slot_timestamp
.saturating_add(self.config.max_endorsements_propagation_time)
< now
{
Some(*endorsement_id)
} else {
None
}
}
Err(_) => Some(*endorsement_id),
}
})
.collect()
};
endorsements_to_propagate.drop_endorsement_refs(&endorsements_to_not_propagate);
if let Err(err) = self.sender_propagation_endorsements.try_send(
EndorsementHandlerPropagationCommand::PropagateEndorsements(
endorsements_to_propagate,
),
) {
warn!("Failed to send from block retrieval thread of endorsement handler to propagation: {:?}", err);
}
// Add to pool
self.pool_controller.add_endorsements(endorsements);
}
Expand Down Expand Up @@ -1051,6 +1098,7 @@ impl RetrievalThread {
massa_trace!("protocol.protocol_worker.note_operations_from_peer", { "peer": source_peer_id, "operations": operations });
let length = operations.len();
let mut new_operations = PreHashMap::with_capacity(length);
let mut received_ids = PreHashSet::with_capacity(length);
for operation in operations {
let operation_id = operation.id;
if operation.serialized_size() > self.config.max_serialized_operations_size_per_block {
Expand All @@ -1061,6 +1109,7 @@ impl RetrievalThread {
self.config.max_serialized_operations_size_per_block
)));
};
received_ids.insert(operation_id);

// Check operation signature only if not already checked.
if self
Expand All @@ -1074,25 +1123,89 @@ impl RetrievalThread {
new_operations.insert(operation_id, operation);
};
}

// optimized signature verification
verify_sigs_batch(
&new_operations
.iter()
.map(|(op_id, op)| (*op_id.get_hash(), op.signature, op.content_creator_pub_key))
.collect::<Vec<_>>(),
)?;
{

'write_cache: {
// add to checked operations
let mut cache_write = self.operation_cache.write();
for op_id in new_operations.keys().copied() {
cache_write.insert_checked_operation(op_id);
}

// add to known ops
let Ok(known_ops) = cache_write
.ops_known_by_peer
.get_or_insert(source_peer_id.clone(), || {
LruMap::new(ByLength::new(
self.config
.max_node_known_ops_size
.try_into()
.expect("max_node_known_ops_size in config must be > 0"),
))
})
.ok_or(()) else {
warn!("ops_known_by_peer limitation reached");
break 'write_cache;
};
for id in received_ids {
known_ops.insert(id.prefix(), ());
}
}
self.sender_propagation_ops
.try_send(OperationHandlerPropagationCommand::AnnounceOperations(
new_operations.keys().copied().collect(),
))
.map_err(|err| ProtocolError::ChannelError(err.to_string()))?;

if !new_operations.is_empty() {
// Store operation, claim locally
let mut ops = self.storage.clone_without_refs();
ops.store_operations(new_operations.into_values().collect());

// Propagate operations when their expire period isn't `max_operations_propagation_time` old.
let mut ops_to_propagate = ops.clone();
let operations_to_not_propagate = {
let now = MassaTime::now()?;
let read_operations = ops_to_propagate.read_operations();
ops_to_propagate
.get_op_refs()
.iter()
.filter(|op_id| {
let expire_period =
read_operations.get(op_id).unwrap().content.expire_period;
let expire_period_timestamp = get_block_slot_timestamp(
self.config.thread_count,
self.config.t0,
self.config.genesis_timestamp,
Slot::new(expire_period, 0),
);
match expire_period_timestamp {
Ok(slot_timestamp) => {
slot_timestamp
.saturating_add(self.config.max_operations_propagation_time)
< now
}
Err(_) => true,
}
})
.copied()
.collect()
};
ops_to_propagate.drop_operation_refs(&operations_to_not_propagate);
let to_announce: PreHashSet<OperationId> =
ops_to_propagate.get_op_refs().iter().copied().collect();
self.storage.extend(ops_to_propagate);
self.sender_propagation_ops
.try_send(OperationHandlerPropagationCommand::AnnounceOperations(
to_announce,
))
.map_err(|err| ProtocolError::SendError(err.to_string()))?;
// Add to pool
self.pool_controller.add_operations(ops);
}

Ok(())
}

Expand Down Expand Up @@ -1345,6 +1458,7 @@ pub fn start_retrieval_thread(
receiver: MassaReceiver<BlockHandlerRetrievalCommand>,
_internal_sender: MassaSender<BlockHandlerPropagationCommand>,
sender_propagation_ops: MassaSender<OperationHandlerPropagationCommand>,
sender_propagation_endorsements: MassaSender<EndorsementHandlerPropagationCommand>,
peer_cmd_sender: MassaSender<PeerManagementCmd>,
config: ProtocolConfig,
endorsement_cache: SharedEndorsementCache,
Expand All @@ -1368,6 +1482,7 @@ pub fn start_retrieval_thread(
asked_blocks: HashMap::default(),
peer_cmd_sender,
sender_propagation_ops,
sender_propagation_endorsements,
receiver_network,
block_message_serializer,
receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ impl PropagationThread {
}
}
}
let endorsements_ids: PreHashSet<EndorsementId> = endorsements
let ids: PreHashSet<EndorsementId> = endorsements
.get_endorsement_refs()
.iter()
.copied()
.collect();
{
let mut cache_write = self.cache.write();
for endorsement_id in endorsements_ids.iter().copied() {
for endorsement_id in ids.iter().copied() {
cache_write.checked_endorsements.insert(endorsement_id, ());
}
// Add peers that potentially don't exist in cache
Expand Down

0 comments on commit f8c281d

Please sign in to comment.