From 12652ba57c2430a074de54e6a8e06e51b0ab28fd Mon Sep 17 00:00:00 2001 From: eitanm-starkware <144585602+eitanm-starkware@users.noreply.github.com> Date: Tue, 16 Jul 2024 10:35:53 +0300 Subject: [PATCH] refactor(network): move callback from response_receiver to query_sender (#2213) --- .../src/network_manager/mod.rs | 136 +++++++++++------- .../src/network_manager/test.rs | 5 +- crates/papyrus_p2p_sync/src/client/header.rs | 3 +- .../src/client/header_test.rs | 50 +++---- crates/papyrus_p2p_sync/src/client/mod.rs | 22 +-- .../papyrus_p2p_sync/src/client/state_diff.rs | 2 +- .../src/client/state_diff_test.rs | 55 +++---- .../src/client/stream_builder.rs | 10 +- .../papyrus_p2p_sync/src/client/test_utils.rs | 7 +- .../sequencing/papyrus_consensus/src/lib.rs | 6 +- .../src/papyrus_consensus_context.rs | 6 +- 11 files changed, 167 insertions(+), 135 deletions(-) diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index d0b15991da..18ec50a981 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -42,13 +42,13 @@ pub struct GenericNetworkManager { // Splitting the response receivers from the query senders in order to poll all // receivers simultaneously. // Each receiver has a matching sender and vice versa (i.e the maps have the same keys). - sqmr_outbound_query_receivers: StreamHashMap>, - sqmr_outbound_response_senders: HashMap>, + sqmr_outbound_query_receivers: StreamHashMap>, + sqmr_outbound_response_senders: HashMap>, // Splitting the broadcast receivers from the broadcasted senders in order to poll all // receivers simultaneously. // Each receiver has a matching sender and vice versa (i.e the maps have the same keys). messages_to_broadcast_receivers: StreamHashMap>, - broadcasted_messages_senders: HashMap>, + broadcasted_messages_senders: HashMap>, outbound_session_id_to_protocol: HashMap, reported_peer_receivers: FuturesUnordered>>, // Fields for metrics @@ -62,8 +62,8 @@ impl GenericNetworkManager { tokio::select! { Some(event) = self.swarm.next() => self.handle_swarm_event(event), Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res), - Some((protocol, query)) = self.sqmr_outbound_query_receivers.next() => { - self.handle_local_sqmr_query(protocol, query) + Some((protocol, (query, report_receiver))) = self.sqmr_outbound_query_receivers.next() => { + self.handle_local_sqmr_query(protocol, query, report_receiver) } Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => { self.broadcast_message(message, topic_hash); @@ -155,12 +155,11 @@ impl GenericNetworkManager { panic!("Protocol '{}' has already been registered as a client.", protocol); } - let query_fn: fn(Query) -> Ready> = - |query| ready(Ok(Bytes::from(query))); + let query_fn: SendQueryConverterFn = + |(query, report_receiver)| ready(Ok((Bytes::from(query), report_receiver))); let query_sender = query_sender.with(query_fn); - let response_fn: ReceivedMessagesConverterFn = - |(x, report_callback)| (Response::try_from(x), report_callback); + let response_fn: ReceivedMessagesConverterFn = |x| Response::try_from(x); let response_receiver = response_receiver.map(response_fn); SqmrSubscriberChannels { query_sender, response_receiver } @@ -205,8 +204,8 @@ impl GenericNetworkManager { let messages_to_broadcast_sender = messages_to_broadcast_sender.with(messages_to_broadcast_fn); - let broadcasted_messages_fn: ReceivedMessagesConverterFn = - |(x, report_callback)| (T::try_from(x), report_callback); + let broadcasted_messages_fn: BroadcastReceivedMessagesConverterFn = + |(x, report_sender)| (T::try_from(x), report_sender); let broadcasted_messages_receiver = broadcasted_messages_receiver.map(broadcasted_messages_fn); @@ -354,7 +353,7 @@ impl GenericNetworkManager { sqmr::behaviour::ExternalEvent::ReceivedResponse { outbound_session_id, response, - peer_id, + peer_id: _peer_id, } => { trace!( "Received response from peer for session id: {outbound_session_id:?}. sending \ @@ -364,14 +363,12 @@ impl GenericNetworkManager { .outbound_session_id_to_protocol .get(&outbound_session_id) .expect("Received response from an unknown session id"); - let report_callback_sender = - self.create_external_callback_for_received_data(peer_id); if let Some(response_sender) = self.sqmr_outbound_response_senders.get_mut(protocol) { // TODO(shahak): Close the channel if the buffer is full. send_now( response_sender, - (response, report_callback_sender), + response, format!( "Received response for an outbound query while the buffer is full. \ Dropping it. Session: {outbound_session_id:?}" @@ -400,8 +397,16 @@ impl GenericNetworkManager { fn handle_gossipsub_behaviour_event(&mut self, event: gossipsub_impl::ExternalEvent) { match event { gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } => { - let report_callback_sender = - self.create_external_callback_for_received_data(originated_peer_id); + let (report_sender, report_receiver) = oneshot::channel::<()>(); + let peer_id = originated_peer_id; + let report_sender = Box::new(move || { + error!("Report sender was called for message from {peer_id:?}"); + let res = report_sender.send(()); + if let Err(e) = res { + error!("Failed to send report. Error: {e:?}"); + } + }); + self.handle_new_report_receiver(originated_peer_id, report_receiver); let Some(sender) = self.broadcasted_messages_senders.get_mut(&topic_hash) else { error!( "Received a message from a topic we're not subscribed to with hash \ @@ -409,7 +414,7 @@ impl GenericNetworkManager { ); return; }; - let send_result = sender.try_send((message, report_callback_sender)); + let send_result = sender.try_send((message, report_sender)); if let Err(e) = send_result { if e.is_disconnected() { panic!("Receiver was dropped. This should never happen.") @@ -446,7 +451,12 @@ impl GenericNetworkManager { }; } - fn handle_local_sqmr_query(&mut self, protocol: StreamProtocol, query: Bytes) { + fn handle_local_sqmr_query( + &mut self, + protocol: StreamProtocol, + query: Bytes, + report_receiver: ReportReceiver, + ) { match self.swarm.send_query(query, PeerId::random(), protocol.clone()) { Ok(outbound_session_id) => { debug!("Sent query to peer. outbound_session_id: {outbound_session_id:?}"); @@ -456,6 +466,23 @@ impl GenericNetworkManager { self.num_active_outbound_sessions as f64 ); self.outbound_session_id_to_protocol.insert(outbound_session_id, protocol); + // TODO(eitan): this match always results in error as the session isnt assigned yet. + // save map between outbound_session_id and report_receiver. once session is + // assigned call handle_new_report_receiver + match self + .swarm + .get_peer_id_from_session_id(SessionId::OutboundSessionId(outbound_session_id)) + { + Ok(peer_id) => { + self.handle_new_report_receiver(peer_id, report_receiver); + } + Err(e) => { + error!( + "Got a report before any message was received. Ignoring report. \ + Error: {e:?}" + ); + } + } } Err(e) => { info!( @@ -488,23 +515,15 @@ impl GenericNetworkManager { } } } - fn create_external_callback_for_received_data( - &self, - peer_id: PeerId, - ) -> Box { - let (report_callback_sender, report_callback_receiver) = oneshot::channel::<()>(); + fn handle_new_report_receiver(&self, peer_id: PeerId, report_receiver: oneshot::Receiver<()>) { self.reported_peer_receivers.push( - report_callback_receiver + report_receiver .map(move |result| match result { Ok(_) => Some(peer_id), Err(_) => None, }) .boxed(), ); - Box::new(move || { - // TODO(shahak): Check if we can panic in case of error. - let _ = report_callback_sender.send(()); - }) } } @@ -560,8 +579,8 @@ where |x| ready(Ok(Bytes::from(x))); let messages_to_broadcast_sender = messages_to_broadcast_sender.with(messages_to_broadcast_fn); - let broadcasted_messages_fn: ReceivedMessagesConverterFn = - |(x, report_callback)| (T::try_from(x), report_callback); + let broadcasted_messages_fn: BroadcastReceivedMessagesConverterFn = + |(x, report_sender)| (T::try_from(x), report_sender); let broadcasted_messages_receiver = broadcasted_messages_receiver.map(broadcasted_messages_fn); let subscriber_channels = @@ -590,23 +609,27 @@ where } #[cfg(feature = "testing")] -pub fn dummy_report_callback() -> ReportCallback { +pub fn dummy_report_sender() -> ReportSender { Box::new(|| {}) } // TODO(shahak): Create a custom struct if Box dyn becomes an overhead. -pub type ReportCallback = Box; +// TODO(eitan): Change type to oneshot::Sender<()> +pub type ReportSender = Box; +pub type ReportReceiver = oneshot::Receiver<()>; -// TODO(shahak): Add report callback. +// TODO(shahak): Add report sender. pub type SqmrQueryReceiver = Map)>, ReceivedQueryConverterFn>; type ReceivedQueryConverterFn = fn( (Bytes, Sender), - ) -> (Result>::Error>, SubscriberSender); + ) + -> (Result>::Error>, BroadcastSubscriberSender); -pub type SubscriberSender = With< +// TODO(eitan): improve naming of final channel types +pub type BroadcastSubscriberSender = With< Sender, Bytes, T, @@ -614,35 +637,50 @@ pub type SubscriberSender = With< fn(T) -> Ready>, >; +pub type SqmrSubscriberSender = With< + Sender<(Bytes, ReportReceiver)>, + (Bytes, ReportReceiver), + (T, ReportReceiver), + Ready>, + fn((T, ReportReceiver)) -> Ready>, +>; + +pub type SendQueryConverterFn = + fn((Query, ReportReceiver)) -> Ready>; + // TODO(shahak): rename to ConvertFromBytesReceiver and add an alias called BroadcastReceiver -pub type SubscriberReceiver = - Map, ReceivedMessagesConverterFn>; +pub type SqmrSubscriberReceiver = Map, ReceivedMessagesConverterFn>; + +type ReceivedMessagesConverterFn = fn(Bytes) -> Result>::Error>; + +pub type BroadcastSubscriberReceiver = + Map, BroadcastReceivedMessagesConverterFn>; -type ReceivedMessagesConverterFn = - fn((Bytes, ReportCallback)) -> (Result>::Error>, ReportCallback); +type BroadcastReceivedMessagesConverterFn = + fn((Bytes, ReportSender)) -> (Result>::Error>, ReportSender); // TODO(shahak): Unite channels to a Sender of Query and Receiver of Responses. pub struct SqmrSubscriberChannels, Response: TryFrom> { - pub query_sender: SubscriberSender, - pub response_receiver: SubscriberReceiver, + pub query_sender: SqmrSubscriberSender, + pub response_receiver: SqmrSubscriberReceiver, } pub struct BroadcastSubscriberChannels> { - pub messages_to_broadcast_sender: SubscriberSender, - pub broadcasted_messages_receiver: SubscriberReceiver, + pub messages_to_broadcast_sender: BroadcastSubscriberSender, + pub broadcasted_messages_receiver: BroadcastSubscriberReceiver, } #[cfg(feature = "testing")] pub type MockBroadcastedMessagesSender = With< - Sender<(Bytes, ReportCallback)>, - (Bytes, ReportCallback), - (T, ReportCallback), - Ready>, + Sender<(Bytes, ReportSender)>, + (Bytes, ReportSender), + (T, ReportSender), + Ready>, MockBroadcastedMessagesFn, >; #[cfg(feature = "testing")] type MockBroadcastedMessagesFn = - fn((T, ReportCallback)) -> Ready>; + fn((T, ReportSender)) -> Ready>; #[cfg(feature = "testing")] pub type MockMessagesToBroadcastReceiver = Map, fn(Bytes) -> T>; #[cfg(feature = "testing")] diff --git a/crates/papyrus_network/src/network_manager/test.rs b/crates/papyrus_network/src/network_manager/test.rs index 44ea54c4e0..55213b6eb6 100644 --- a/crates/papyrus_network/src/network_manager/test.rs +++ b/crates/papyrus_network/src/network_manager/test.rs @@ -230,17 +230,18 @@ async fn register_sqmr_protocol_client_and_use_channels() { let response_receiver_collector = response_receiver .enumerate() .take(VEC1.len()) - .map(|(i, (result, _report_callback))| { + .map(|(i, result)| { let result = result.unwrap(); // this simulates how the mock swarm parses the query and sends responses to it assert_eq!(result, vec![VEC1[i]]); result }) .collect::>(); + let (_report_callback, report_receiver) = oneshot::channel::<()>(); tokio::select! { _ = network_manager.run() => panic!("network manager ended"), _ = poll_fn(|cx| event_listner.poll_unpin(cx)).then(|_| async move { - query_sender.send(VEC1.clone()).await.unwrap()}) + query_sender.send((VEC1.clone(), report_receiver)).await.unwrap()}) .then(|_| async move { *cloned_response_receiver_length.lock().await = response_receiver_collector.await.len(); }) => {}, diff --git a/crates/papyrus_p2p_sync/src/client/header.rs b/crates/papyrus_p2p_sync/src/client/header.rs index 3a3ad1262f..32c2acc53c 100644 --- a/crates/papyrus_p2p_sync/src/client/header.rs +++ b/crates/papyrus_p2p_sync/src/client/header.rs @@ -44,8 +44,7 @@ impl DataStreamBuilder for HeaderStreamBuilder { _storage_reader: &'a StorageReader, ) -> BoxFuture<'a, Result, P2PSyncError>> { async move { - // TODO(shahak): Use the report callback. - let (maybe_signed_header, _report_callback) = + let maybe_signed_header = tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_receiver.next()) .await? .ok_or(P2PSyncError::ReceiverChannelTerminated { diff --git a/crates/papyrus_p2p_sync/src/client/header_test.rs b/crates/papyrus_p2p_sync/src/client/header_test.rs index dbebeb392d..38caa8cc2c 100644 --- a/crates/papyrus_p2p_sync/src/client/header_test.rs +++ b/crates/papyrus_p2p_sync/src/client/header_test.rs @@ -44,7 +44,7 @@ async fn signed_headers_basic_flow() { let end_block_number = (query_index + 1) * HEADER_QUERY_LENGTH; // Receive query and validate it. - let query = header_query_receiver.next().await.unwrap(); + let (query, _report_receiver) = header_query_receiver.next().await.unwrap(); assert_eq!( query, HeaderQuery(Query { @@ -63,18 +63,15 @@ async fn signed_headers_basic_flow() { { // Send responses headers_sender - .send(( - (Ok(DataOrFin(Some(SignedBlockHeader { - block_header: BlockHeader { - block_number: BlockNumber(i.try_into().unwrap()), - block_hash: *block_hash, - state_diff_length: Some(0), - ..Default::default() - }, - signatures: vec![*block_signature], - })))), - Box::new(|| {}), - )) + .send(Ok(DataOrFin(Some(SignedBlockHeader { + block_header: BlockHeader { + block_number: BlockNumber(i.try_into().unwrap()), + block_hash: *block_hash, + state_diff_length: Some(0), + ..Default::default() + }, + signatures: vec![*block_signature], + })))) .await .unwrap(); @@ -93,7 +90,7 @@ async fn signed_headers_basic_flow() { txn.get_block_signature(block_number).unwrap().unwrap(); assert_eq!(*block_signature, actual_block_signature); } - headers_sender.send((Ok(DataOrFin(None)), Box::new(|| {}))).await.unwrap(); + headers_sender.send(Ok(DataOrFin(None))).await.unwrap(); } }; @@ -128,25 +125,22 @@ async fn sync_sends_new_header_query_if_it_got_partial_responses() { for (i, (block_hash, signature)) in block_hashes_and_signatures.into_iter().enumerate() { headers_sender - .send(( - Ok(DataOrFin(Some(SignedBlockHeader { - block_header: BlockHeader { - block_number: BlockNumber(i.try_into().unwrap()), - block_hash, - state_diff_length: Some(0), - ..Default::default() - }, - signatures: vec![signature], - }))), - Box::new(|| {}), - )) + .send(Ok(DataOrFin(Some(SignedBlockHeader { + block_header: BlockHeader { + block_number: BlockNumber(i.try_into().unwrap()), + block_hash, + state_diff_length: Some(0), + ..Default::default() + }, + signatures: vec![signature], + })))) .await .unwrap(); } - headers_sender.send((Ok(DataOrFin(None)), Box::new(|| {}))).await.unwrap(); + headers_sender.send(Ok(DataOrFin(None))).await.unwrap(); // First unwrap is for the timeout. Second unwrap is for the Option returned from Stream. - let query = + let (query, _report_receiver) = timeout(TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, header_query_receiver.next()) .await .unwrap() diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index c08b32ca9a..c62cecff13 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -19,7 +19,7 @@ use header::HeaderStreamBuilder; use papyrus_config::converters::deserialize_seconds_to_duration; use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; -use papyrus_network::network_manager::ReportCallback; +use papyrus_network::network_manager::ReportReceiver; use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{ DataOrFin, @@ -159,15 +159,16 @@ pub enum P2PSyncError { SendError(#[from] SendError), } -type Response = (Result, ProtobufConversionError>, ReportCallback); +type Response = Result, ProtobufConversionError>; // TODO(Eitan): Use SqmrSubscriberChannels once there is a utility function for testing -type QuerySender = Box + Unpin + Send + 'static>; +type QuerySender = + Box + Unpin + Send + 'static>; type WithQuerySender = With< QuerySender, - T, - Query, - Ready>, - fn(Query) -> Ready>, + (T, ReportReceiver), + (Query, ReportReceiver), + Ready>, + fn((Query, ReportReceiver)) -> Ready>, >; type ResponseReceiver = Box> + Unpin + Send + 'static>; type HeaderQuerySender = QuerySender; @@ -193,7 +194,8 @@ impl P2PSyncClientChannels { config: P2PSyncClientConfig, ) -> impl Stream + Send + 'static { let header_stream = HeaderStreamBuilder::create_stream( - self.header_query_sender.with(|query| ready(Ok(HeaderQuery(query)))), + self.header_query_sender + .with(|(query, report_receiver)| ready(Ok((HeaderQuery(query), report_receiver)))), self.header_response_receiver, storage_reader.clone(), config.wait_period_for_new_data, @@ -202,7 +204,9 @@ impl P2PSyncClientChannels { ); let state_diff_stream = StateDiffStreamBuilder::create_stream( - self.state_diff_query_sender.with(|query| ready(Ok(StateDiffQuery(query)))), + self.state_diff_query_sender.with(|(query, report_receiver)| { + ready(Ok((StateDiffQuery(query), report_receiver))) + }), self.state_diff_response_receiver, storage_reader.clone(), config.wait_period_for_new_data, diff --git a/crates/papyrus_p2p_sync/src/client/state_diff.rs b/crates/papyrus_p2p_sync/src/client/state_diff.rs index 9286ff423b..ab31de6e4d 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff.rs @@ -53,7 +53,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { })?; while current_state_diff_len < target_state_diff_len { - let (maybe_state_diff_chunk, _report_callback) = + let maybe_state_diff_chunk = tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diff_chunks_receiver.next()) .await? .ok_or(P2PSyncError::ReceiverChannelTerminated { diff --git a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs index 17d840c24f..78e4b754a2 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs @@ -79,18 +79,15 @@ async fn state_diff_basic_flow() { { // Send responses headers_sender - .send(( - Ok(DataOrFin(Some(SignedBlockHeader { - block_header: BlockHeader { - block_number: BlockNumber(i.try_into().unwrap()), - block_hash: *block_hash, - state_diff_length: Some(state_diff.len()), - ..Default::default() - }, - signatures: vec![*block_signature], - }))), - Box::new(|| {}), - )) + .send(Ok(DataOrFin(Some(SignedBlockHeader { + block_header: BlockHeader { + block_number: BlockNumber(i.try_into().unwrap()), + block_hash: *block_hash, + state_diff_length: Some(state_diff.len()), + ..Default::default() + }, + signatures: vec![*block_signature], + })))) .await .unwrap(); } @@ -99,7 +96,7 @@ async fn state_diff_basic_flow() { (STATE_DIFF_QUERY_LENGTH, HEADER_QUERY_LENGTH - STATE_DIFF_QUERY_LENGTH), ] { // Get a state diff query and validate it - let query = state_diff_query_receiver.next().await.unwrap(); + let (query, _report_receiver) = state_diff_query_receiver.next().await.unwrap(); assert_eq!( query, StateDiffQuery(Query { @@ -120,7 +117,7 @@ async fn state_diff_basic_flow() { assert_eq!(block_number, txn.get_state_marker().unwrap()); state_diffs_sender - .send((Ok(DataOrFin(Some(state_diff_chunk.clone()))), Box::new(|| {}))) + .send(Ok(DataOrFin(Some(state_diff_chunk.clone())))) .await .unwrap(); @@ -167,7 +164,7 @@ async fn state_diff_basic_flow() { }; assert_eq!(state_diff, expected_state_diff); } - state_diffs_sender.send((Ok(DataOrFin(None)), Box::new(|| {}))).await.unwrap(); + state_diffs_sender.send(Ok(DataOrFin(None))).await.unwrap(); } }; @@ -326,23 +323,20 @@ async fn validate_state_diff_fails( let parse_queries_future = async move { // Send a single header. There's no need to fill the entire query. headers_sender - .send(( - Ok(DataOrFin(Some(SignedBlockHeader { - block_header: BlockHeader { - block_number: BlockNumber(0), - block_hash, - state_diff_length: Some(state_diff_length_in_header), - ..Default::default() - }, - signatures: vec![block_signature], - }))), - Box::new(|| {}), - )) + .send(Ok(DataOrFin(Some(SignedBlockHeader { + block_header: BlockHeader { + block_number: BlockNumber(0), + block_hash, + state_diff_length: Some(state_diff_length_in_header), + ..Default::default() + }, + signatures: vec![block_signature], + })))) .await .unwrap(); // Get a state diff query and validate it - let query = state_diff_query_receiver.next().await.unwrap(); + let (query, _report_reciever) = state_diff_query_receiver.next().await.unwrap(); assert_eq!( query, StateDiffQuery(Query { @@ -359,10 +353,7 @@ async fn validate_state_diff_fails( let txn = storage_reader.begin_ro_txn().unwrap(); assert_eq!(0, txn.get_state_marker().unwrap().0); - state_diffs_sender - .send((Ok(DataOrFin(state_diff_chunk)), Box::new(|| {}))) - .await - .unwrap(); + state_diffs_sender.send(Ok(DataOrFin(state_diff_chunk))).await.unwrap(); } tokio::time::sleep(TIMEOUT_FOR_TEST).await; panic!("P2P sync did not receive error"); diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 6b2e6eb6f1..b31182ad59 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -2,6 +2,7 @@ use std::cmp::min; use std::time::Duration; use async_stream::stream; +use futures::channel::oneshot; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt}; @@ -83,14 +84,17 @@ where current_block_number.0, end_block_number, ); + // TODO(shahak): Use the report callback. + //TODO(Eitan): abstract report functionality to the channel struct + let (_report_sender, report_receiver) = oneshot::channel::<()>(); query_sender - .send( + .send(( Query { start_block: BlockHashOrNumber::Number(current_block_number), direction: Direction::Forward, limit, step: STEP, - }, + }, report_receiver,) ) .await?; @@ -122,7 +126,7 @@ where // Consume the None message signaling the end of the query. match data_receiver.next().await { - Some((Ok(DataOrFin(None)), _report_callback)) => { + Some(Ok(DataOrFin(None))) => { debug!("Query sent to network for {:?} finished", Self::TYPE_DESCRIPTION); }, Some(_) => Err(P2PSyncError::TooManyResponses)?, diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index 99e23cf007..6dd6e01fbb 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -2,6 +2,7 @@ use std::time::Duration; use futures::channel::mpsc::{Receiver, Sender}; use lazy_static::lazy_static; +use papyrus_network::network_manager::ReportReceiver; use papyrus_protobuf::sync::{ HeaderQuery, SignedBlockHeader, @@ -42,10 +43,10 @@ pub struct TestArgs { #[allow(clippy::type_complexity)] pub p2p_sync: P2PSyncClient, pub storage_reader: StorageReader, - pub header_query_receiver: Receiver, - pub state_diff_query_receiver: Receiver, + pub header_query_receiver: Receiver<(HeaderQuery, ReportReceiver)>, + pub state_diff_query_receiver: Receiver<(StateDiffQuery, ReportReceiver)>, #[allow(dead_code)] - pub transaction_query_receiver: Receiver, + pub transaction_query_receiver: Receiver<(TransactionQuery, ReportReceiver)>, pub headers_sender: Sender>, pub state_diffs_sender: Sender>, #[allow(dead_code)] diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index 3e566140d1..4b8f09ad85 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use futures::channel::{mpsc, oneshot}; use papyrus_common::metrics as papyrus_metrics; -use papyrus_network::network_manager::SubscriberReceiver; +use papyrus_network::network_manager::BroadcastSubscriberReceiver; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use single_height_consensus::SingleHeightConsensus; use starknet_api::block::{BlockHash, BlockNumber}; @@ -41,7 +41,7 @@ async fn run_height( context: Arc>, height: BlockNumber, validator_id: ValidatorId, - network_receiver: &mut SubscriberReceiver, + network_receiver: &mut BroadcastSubscriberReceiver, cached_messages: &mut Vec, ) -> Result, ConsensusError> where @@ -105,7 +105,7 @@ pub async fn run_consensus( context: Arc>, start_height: BlockNumber, validator_id: ValidatorId, - mut network_receiver: SubscriberReceiver, + mut network_receiver: BroadcastSubscriberReceiver, ) -> Result<(), ConsensusError> where ProposalWrapper: diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index b2f5b216f0..8d419694fc 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::sink::SinkExt; use futures::StreamExt; -use papyrus_network::network_manager::SubscriberSender; +use papyrus_network::network_manager::BroadcastSubscriberSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; @@ -47,7 +47,7 @@ impl ConsensusBlock for PapyrusConsensusBlock { pub struct PapyrusConsensusContext { storage_reader: StorageReader, - broadcast_sender: Arc>>, + broadcast_sender: Arc>>, validators: Vec, } @@ -56,7 +56,7 @@ impl PapyrusConsensusContext { #[allow(dead_code)] pub fn new( storage_reader: StorageReader, - broadcast_sender: SubscriberSender, + broadcast_sender: BroadcastSubscriberSender, num_validators: u64, ) -> Self { Self {