Skip to content

Commit

Permalink
use stream combinators to return stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Apr 18, 2022
1 parent 2a7c58e commit 8003d07
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 136 deletions.
74 changes: 10 additions & 64 deletions sui_core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver};
use futures::Stream;
use futures::{SinkExt, StreamExt};
use futures::stream::BoxStream;
use futures::StreamExt;
use std::io;
use sui_network::network::{parse_recv_bytes, NetworkClient};
use sui_network::network::NetworkClient;
use sui_network::transport::TcpDataStream;
use sui_types::batch::UpdateItem;
use sui_types::{error::SuiError, messages::*, serialize::*};

static MAX_ERRORS: i32 = 10;
pub(crate) static BUFFER_SIZE: usize = 100;

#[async_trait]
pub trait AuthorityAPI {
Expand Down Expand Up @@ -47,13 +45,14 @@ pub trait AuthorityAPI {
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError>;

/// Handle Batch information requests for this authority.
async fn handle_batch_streaming(
async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<Receiver<Result<BatchInfoResponseItem, SuiError>>, io::Error>;
) -> Result<BatchInfoResponseItemStream, io::Error>;
}

pub type BatchInfoResponseItemStream = BoxStream<'static, Result<BatchInfoResponseItem, SuiError>>;

#[derive(Clone)]
pub struct AuthorityClient(NetworkClient);

Expand Down Expand Up @@ -124,63 +123,10 @@ impl AuthorityAPI for AuthorityClient {
}

/// Handle Batch information requests for this authority.
async fn handle_batch_streaming(
&self,
request: BatchInfoRequest,
) -> Result<Receiver<Result<BatchInfoResponseItem, SuiError>>, io::Error> {
let (mut tx_output, tr_output) = channel(BUFFER_SIZE);
let mut tcp_stream = self
.0
.connect_for_stream(serialize_batch_request(&request))
.await?;

let mut error_count = 0;

// Check the messages from the inflight_stream receiver to ensure each message is a
// BatchInfoResponseItem, then send a Result<BatchInfoResponseItem, SuiError to the channel
// that was passed in. For each message, also check if we have reached the last batch in the
// request, and when we do, end the inflight stream task using tx_cancellation.
loop {
let next_data = tcp_stream.read_data().await.transpose();
let data_result = parse_recv_bytes(next_data);
match data_result.and_then(deserialize_batch_info) {
Ok(batch_info_response_item) => {
// send to the caller via the channel
let _ = tx_output.send(Ok(batch_info_response_item.clone())).await;

// check for ending conditions
match batch_info_response_item {
BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => {
if signed_batch.batch.next_sequence_number > request.end {
break;
}
}
BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest))) => {
if seq > request.end {
break;
}
}
}
}
Err(e) => {
let _ = tx_output.send(Result::Err(e)).await;
error_count += 1;
if error_count >= MAX_ERRORS {
break;
}
}
}
}
Ok(tr_output)
}
}

impl AuthorityClient {
/// Handle Batch information requests for this authority.
pub async fn handle_batch_streaming_as_stream(
async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<impl Stream<Item = Result<BatchInfoResponseItem, SuiError>>, io::Error> {
) -> Result<BatchInfoResponseItemStream, io::Error> {
let tcp_stream = self
.0
.connect_for_stream(serialize_batch_request(&request))
Expand Down Expand Up @@ -224,6 +170,6 @@ impl AuthorityClient {
};
futures::future::ready(flag)
});
Ok(stream)
Ok(Box::pin(stream))
}
}
116 changes: 63 additions & 53 deletions sui_core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority_client::{AuthorityAPI, BUFFER_SIZE};
use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver};
use futures::{SinkExt, StreamExt};
use futures::StreamExt;
use std::io;
use std::io::Error;
use sui_types::crypto::PublicKeyBytes;
use sui_types::{base_types::*, committee::*, fp_ensure};

use sui_types::batch::UpdateItem;
use sui_types::batch::{SignedBatch, TxSequenceNumber, UpdateItem};
use sui_types::{
error::{SuiError, SuiResult},
messages::*,
Expand Down Expand Up @@ -172,6 +170,38 @@ impl<C> SafeClient<C> {
Ok(())
}

fn check_update_item_batch_response(
&self,
request: BatchInfoRequest,
signed_batch: &SignedBatch,
) -> SuiResult {
signed_batch
.signature
.check(signed_batch, signed_batch.authority)?;

// ensure transactions enclosed match requested range
fp_ensure!(
signed_batch.batch.initial_sequence_number >= request.start
&& signed_batch.batch.next_sequence_number
<= (request.end + signed_batch.batch.size),
SuiError::ByzantineAuthoritySuspicion {
authority: self.address
}
);
// todo: ensure signature valid over the set of transactions in the batch
// todo: ensure signature valid over the hash of the previous batch
Ok(())
}

fn check_update_item_transaction_response(
&self,
_request: BatchInfoRequest,
_seq: &TxSequenceNumber,
_digest: &TransactionDigest,
) -> SuiResult {
todo!();
}

/// This function is used by the higher level authority logic to report an
/// error that could be due to this authority.
pub fn report_client_error(&self, _error: SuiError) {
Expand Down Expand Up @@ -265,63 +295,43 @@ where
}

/// Handle Batch information requests for this authority.
async fn handle_batch_streaming(
async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<Receiver<Result<BatchInfoResponseItem, SuiError>>, io::Error> {
let (mut tx_output, tr_output) = channel(BUFFER_SIZE);

let mut batch_info_items = self
) -> Result<BatchInfoResponseItemStream, io::Error> {
let batch_info_items = self
.authority_client
.handle_batch_streaming(request)
.handle_batch_stream(request.clone())
.await?;

if let Some(next_data) = batch_info_items.next().await {
match next_data {
Ok(batch_info_response_item) => {
// do security checks
match batch_info_response_item.clone() {
BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => {
// check signature of batch
let result = signed_batch
.signature
.check(&signed_batch, signed_batch.authority);
// todo: ensure signature valid over the set of transactions in the batch
// todo: ensure signature valid over the hash of the previous batch
// todo: sequence numbers of the transactions enclosed need to match requested
match result {
Ok(_) => {
let _ = tx_output.send(Ok(batch_info_response_item)).await;
}
Err(e) => {
let _ = tx_output.send(Err(e)).await;
}
}
let client = self.clone();
let stream = Box::pin(batch_info_items.then(move |batch_info_item| {
let req_clone = request.clone();
let client = client.clone();
async move {
match &batch_info_item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
if let Err(err) =
client.check_update_item_batch_response(req_clone, signed_batch)
{
client.report_client_error(err.clone());
return Err(err);
}
BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))) => {
// make transaction info request which checks transaction certificate
let transaction_info_request = TransactionInfoRequest {
transaction_digest: digest,
};
let transaction_info_response = self
.handle_transaction_info_request(transaction_info_request)
.await;
match transaction_info_response {
Ok(_) => {
let _ = tx_output.send(Ok(batch_info_response_item)).await;
}
Err(e) => {
let _ = tx_output.send(Err(e)).await;
}
}
batch_info_item
}
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest)))) => {
if let Err(err) =
client.check_update_item_transaction_response(req_clone, seq, digest)
{
client.report_client_error(err.clone());
return Err(err);
}
batch_info_item
}
}
Err(e) => {
return Err(Error::new(std::io::ErrorKind::Other, e));
Err(e) => Err(e.clone()),
}
}
}
Ok(tr_output)
}));
Ok(Box::pin(stream))
}
}
32 changes: 13 additions & 19 deletions sui_core/src/unit_tests/gateway_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::same_item_push)] // get_key_pair returns random elements

use std::collections::VecDeque;
use std::fs;
use std::path::Path;
use std::{
Expand All @@ -13,9 +14,8 @@ use std::{
};

use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver};
use futures::lock::Mutex;
use futures::SinkExt;
use futures::stream;
use move_core_types::{account_address::AccountAddress, ident_str, identifier::Identifier};
use signature::Signer;
use typed_store::Map;
Expand All @@ -30,7 +30,7 @@ use sui_types::messages::Transaction;
use sui_types::object::{Data, Object, Owner, GAS_VALUE_FOR_TESTING};

use crate::authority::{AuthorityState, AuthorityStore};
use crate::authority_client::BUFFER_SIZE;
use crate::authority_client::BatchInfoResponseItemStream;
use crate::gateway_state::{GatewayAPI, GatewayState};

use super::*;
Expand Down Expand Up @@ -116,29 +116,23 @@ impl AuthorityAPI for LocalAuthorityClient {
}

/// Handle Batch information requests for this authority.
async fn handle_batch_streaming(
async fn handle_batch_stream(
&self,
request: BatchInfoRequest,
) -> Result<Receiver<Result<BatchInfoResponseItem, SuiError>>, io::Error> {
) -> Result<BatchInfoResponseItemStream, io::Error> {
let state = self.0.clone();
let (mut tx_output, tr_output) = channel(BUFFER_SIZE);

let update_items = state.lock().await.handle_batch_info_request(request).await;

match update_items {
Ok(t) => {
let mut deq = t.0;
while let Some(update_item) = deq.pop_front() {
let batch_info_response_item = BatchInfoResponseItem(update_item.clone());
let _ = tx_output.send(Ok(batch_info_response_item)).await;
}
let (items, _): (VecDeque<_>, VecDeque<_>) = update_items.into_iter().unzip();
let stream = stream::iter(items.into_iter()).then(|mut item| async move {
let i = item.pop_front();
match i {
Some(i) => Ok(BatchInfoResponseItem(i)),
None => Result::Err(SuiError::BatchErrorSender),
}
Err(e) => {
let err = std::io::Error::new(std::io::ErrorKind::Other, e);
return Err(err);
}
}
Ok(tr_output)
});
Ok(Box::pin(stream))
}
}

Expand Down

0 comments on commit 8003d07

Please sign in to comment.