From 8003d07e477d0bb99ea73c06744a42b282108c0a Mon Sep 17 00:00:00 2001 From: Laura Makdah Date: Mon, 4 Apr 2022 11:45:17 -0700 Subject: [PATCH] use stream combinators to return stream --- sui_core/src/authority_client.rs | 74 ++------------- sui_core/src/safe_client.rs | 116 ++++++++++++----------- sui_core/src/unit_tests/gateway_tests.rs | 32 +++---- 3 files changed, 86 insertions(+), 136 deletions(-) diff --git a/sui_core/src/authority_client.rs b/sui_core/src/authority_client.rs index 9f8a6cb1666fc..fa6a3151f6103 100644 --- a/sui_core/src/authority_client.rs +++ b/sui_core/src/authority_client.rs @@ -3,17 +3,15 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; -use futures::channel::mpsc::{channel, Receiver}; -use futures::Stream; -use futures::{SinkExt, StreamExt}; +use futures::stream::BoxStream; +use futures::StreamExt; use std::io; -use sui_network::network::{parse_recv_bytes, NetworkClient}; +use sui_network::network::NetworkClient; use sui_network::transport::TcpDataStream; use sui_types::batch::UpdateItem; use sui_types::{error::SuiError, messages::*, serialize::*}; static MAX_ERRORS: i32 = 10; -pub(crate) static BUFFER_SIZE: usize = 100; #[async_trait] pub trait AuthorityAPI { @@ -47,13 +45,14 @@ pub trait AuthorityAPI { request: TransactionInfoRequest, ) -> Result; - /// Handle Batch information requests for this authority. - async fn handle_batch_streaming( + async fn handle_batch_stream( &self, request: BatchInfoRequest, - ) -> Result>, io::Error>; + ) -> Result; } +pub type BatchInfoResponseItemStream = BoxStream<'static, Result>; + #[derive(Clone)] pub struct AuthorityClient(NetworkClient); @@ -124,63 +123,10 @@ impl AuthorityAPI for AuthorityClient { } /// Handle Batch information requests for this authority. - async fn handle_batch_streaming( - &self, - request: BatchInfoRequest, - ) -> Result>, io::Error> { - let (mut tx_output, tr_output) = channel(BUFFER_SIZE); - let mut tcp_stream = self - .0 - .connect_for_stream(serialize_batch_request(&request)) - .await?; - - let mut error_count = 0; - - // Check the messages from the inflight_stream receiver to ensure each message is a - // BatchInfoResponseItem, then send a Result { - // send to the caller via the channel - let _ = tx_output.send(Ok(batch_info_response_item.clone())).await; - - // check for ending conditions - match batch_info_response_item { - BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { - if signed_batch.batch.next_sequence_number > request.end { - break; - } - } - BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest))) => { - if seq > request.end { - break; - } - } - } - } - Err(e) => { - let _ = tx_output.send(Result::Err(e)).await; - error_count += 1; - if error_count >= MAX_ERRORS { - break; - } - } - } - } - Ok(tr_output) - } -} - -impl AuthorityClient { - /// Handle Batch information requests for this authority. - pub async fn handle_batch_streaming_as_stream( + async fn handle_batch_stream( &self, request: BatchInfoRequest, - ) -> Result>, io::Error> { + ) -> Result { let tcp_stream = self .0 .connect_for_stream(serialize_batch_request(&request)) @@ -224,6 +170,6 @@ impl AuthorityClient { }; futures::future::ready(flag) }); - Ok(stream) + Ok(Box::pin(stream)) } } diff --git a/sui_core/src/safe_client.rs b/sui_core/src/safe_client.rs index d3f368b68e99f..7b774f99dd869 100644 --- a/sui_core/src/safe_client.rs +++ b/sui_core/src/safe_client.rs @@ -2,16 +2,14 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::authority_client::{AuthorityAPI, BUFFER_SIZE}; +use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream}; use async_trait::async_trait; -use futures::channel::mpsc::{channel, Receiver}; -use futures::{SinkExt, StreamExt}; +use futures::StreamExt; use std::io; -use std::io::Error; use sui_types::crypto::PublicKeyBytes; use sui_types::{base_types::*, committee::*, fp_ensure}; -use sui_types::batch::UpdateItem; +use sui_types::batch::{SignedBatch, TxSequenceNumber, UpdateItem}; use sui_types::{ error::{SuiError, SuiResult}, messages::*, @@ -172,6 +170,38 @@ impl SafeClient { Ok(()) } + fn check_update_item_batch_response( + &self, + request: BatchInfoRequest, + signed_batch: &SignedBatch, + ) -> SuiResult { + signed_batch + .signature + .check(signed_batch, signed_batch.authority)?; + + // ensure transactions enclosed match requested range + fp_ensure!( + signed_batch.batch.initial_sequence_number >= request.start + && signed_batch.batch.next_sequence_number + <= (request.end + signed_batch.batch.size), + SuiError::ByzantineAuthoritySuspicion { + authority: self.address + } + ); + // todo: ensure signature valid over the set of transactions in the batch + // todo: ensure signature valid over the hash of the previous batch + Ok(()) + } + + fn check_update_item_transaction_response( + &self, + _request: BatchInfoRequest, + _seq: &TxSequenceNumber, + _digest: &TransactionDigest, + ) -> SuiResult { + todo!(); + } + /// This function is used by the higher level authority logic to report an /// error that could be due to this authority. pub fn report_client_error(&self, _error: SuiError) { @@ -265,63 +295,43 @@ where } /// Handle Batch information requests for this authority. - async fn handle_batch_streaming( + async fn handle_batch_stream( &self, request: BatchInfoRequest, - ) -> Result>, io::Error> { - let (mut tx_output, tr_output) = channel(BUFFER_SIZE); - - let mut batch_info_items = self + ) -> Result { + let batch_info_items = self .authority_client - .handle_batch_streaming(request) + .handle_batch_stream(request.clone()) .await?; - if let Some(next_data) = batch_info_items.next().await { - match next_data { - Ok(batch_info_response_item) => { - // do security checks - match batch_info_response_item.clone() { - BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { - // check signature of batch - let result = signed_batch - .signature - .check(&signed_batch, signed_batch.authority); - // todo: ensure signature valid over the set of transactions in the batch - // todo: ensure signature valid over the hash of the previous batch - // todo: sequence numbers of the transactions enclosed need to match requested - match result { - Ok(_) => { - let _ = tx_output.send(Ok(batch_info_response_item)).await; - } - Err(e) => { - let _ = tx_output.send(Err(e)).await; - } - } + let client = self.clone(); + let stream = Box::pin(batch_info_items.then(move |batch_info_item| { + let req_clone = request.clone(); + let client = client.clone(); + async move { + match &batch_info_item { + Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => { + if let Err(err) = + client.check_update_item_batch_response(req_clone, signed_batch) + { + client.report_client_error(err.clone()); + return Err(err); } - BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))) => { - // make transaction info request which checks transaction certificate - let transaction_info_request = TransactionInfoRequest { - transaction_digest: digest, - }; - let transaction_info_response = self - .handle_transaction_info_request(transaction_info_request) - .await; - match transaction_info_response { - Ok(_) => { - let _ = tx_output.send(Ok(batch_info_response_item)).await; - } - Err(e) => { - let _ = tx_output.send(Err(e)).await; - } - } + batch_info_item + } + Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest)))) => { + if let Err(err) = + client.check_update_item_transaction_response(req_clone, seq, digest) + { + client.report_client_error(err.clone()); + return Err(err); } + batch_info_item } - } - Err(e) => { - return Err(Error::new(std::io::ErrorKind::Other, e)); + Err(e) => Err(e.clone()), } } - } - Ok(tr_output) + })); + Ok(Box::pin(stream)) } } diff --git a/sui_core/src/unit_tests/gateway_tests.rs b/sui_core/src/unit_tests/gateway_tests.rs index 8d328069be01d..696b01cc959e1 100644 --- a/sui_core/src/unit_tests/gateway_tests.rs +++ b/sui_core/src/unit_tests/gateway_tests.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::same_item_push)] // get_key_pair returns random elements +use std::collections::VecDeque; use std::fs; use std::path::Path; use std::{ @@ -13,9 +14,8 @@ use std::{ }; use async_trait::async_trait; -use futures::channel::mpsc::{channel, Receiver}; use futures::lock::Mutex; -use futures::SinkExt; +use futures::stream; use move_core_types::{account_address::AccountAddress, ident_str, identifier::Identifier}; use signature::Signer; use typed_store::Map; @@ -30,7 +30,7 @@ use sui_types::messages::Transaction; use sui_types::object::{Data, Object, Owner, GAS_VALUE_FOR_TESTING}; use crate::authority::{AuthorityState, AuthorityStore}; -use crate::authority_client::BUFFER_SIZE; +use crate::authority_client::BatchInfoResponseItemStream; use crate::gateway_state::{GatewayAPI, GatewayState}; use super::*; @@ -116,29 +116,23 @@ impl AuthorityAPI for LocalAuthorityClient { } /// Handle Batch information requests for this authority. - async fn handle_batch_streaming( + async fn handle_batch_stream( &self, request: BatchInfoRequest, - ) -> Result>, io::Error> { + ) -> Result { let state = self.0.clone(); - let (mut tx_output, tr_output) = channel(BUFFER_SIZE); let update_items = state.lock().await.handle_batch_info_request(request).await; - match update_items { - Ok(t) => { - let mut deq = t.0; - while let Some(update_item) = deq.pop_front() { - let batch_info_response_item = BatchInfoResponseItem(update_item.clone()); - let _ = tx_output.send(Ok(batch_info_response_item)).await; - } + let (items, _): (VecDeque<_>, VecDeque<_>) = update_items.into_iter().unzip(); + let stream = stream::iter(items.into_iter()).then(|mut item| async move { + let i = item.pop_front(); + match i { + Some(i) => Ok(BatchInfoResponseItem(i)), + None => Result::Err(SuiError::BatchErrorSender), } - Err(e) => { - let err = std::io::Error::new(std::io::ErrorKind::Other, e); - return Err(err); - } - } - Ok(tr_output) + }); + Ok(Box::pin(stream)) } }