diff --git a/network_utils/Cargo.toml b/network_utils/Cargo.toml index cd0c8cd866553..e6ffd204029eb 100644 --- a/network_utils/Cargo.toml +++ b/network_utils/Cargo.toml @@ -9,10 +9,12 @@ edition = "2021" [dependencies] bytes = "1.1.0" futures = "0.3.21" +async-trait = "0.1.52" log = "0.4.14" net2 = "0.2.37" tokio = { version = "1.17.0", features = ["full"] } tracing = { version = "0.1.31", features = ["log"] } +tokio-util = { version = "0.7.0", features = ["codec"] } sui-types = { path = "../sui_types" } diff --git a/network_utils/src/network.rs b/network_utils/src/network.rs index 4ff3c050e0945..6af18eb76b2a5 100644 --- a/network_utils/src/network.rs +++ b/network_utils/src/network.rs @@ -41,13 +41,16 @@ impl NetworkClient { } } - async fn send_recv_bytes_internal(&self, buf: Vec) -> Result, io::Error> { + async fn send_recv_bytes_internal(&self, buf: Vec) -> Result>, io::Error> { let address = format!("{}:{}", self.base_address, self.base_port); let mut stream = connect(address, self.buffer_size).await?; // Send message time::timeout(self.send_timeout, stream.write_data(&buf)).await??; // Wait for reply - time::timeout(self.recv_timeout, stream.read_data()).await? + time::timeout(self.recv_timeout, async { + stream.read_data().await.transpose() + }) + .await? } pub async fn send_recv_bytes(&self, buf: Vec) -> Result { @@ -55,7 +58,7 @@ impl NetworkClient { Err(error) => Err(SuiError::ClientIoError { error: format!("{}", error), }), - Ok(response) => { + Ok(Some(response)) => { // Parse reply match deserialize_message(&response[..]) { Ok(SerializedMessage::Error(error)) => Err(*error), @@ -64,6 +67,9 @@ impl NetworkClient { // _ => Err(SuiError::UnexpectedMessage), } } + Ok(None) => Err(SuiError::ClientIoError { + error: "Empty response from authority.".to_string(), + }), } } @@ -101,17 +107,17 @@ impl NetworkClient { info!("In flight {} Remaining {}", in_flight, requests.len()); } match time::timeout(self.recv_timeout, stream.read_data()).await { - Ok(Ok(buffer)) => { + Ok(Some(Ok(buffer))) => { in_flight -= 1; responses.push(Bytes::from(buffer)); } - Ok(Err(error)) => { - if error.kind() == io::ErrorKind::UnexpectedEof { - info!("Socket closed by server"); - return Ok(responses); - } + Ok(Some(Err(error))) => { error!("Received error response: {}", error); } + Ok(None) => { + info!("Socket closed by server"); + return Ok(responses); + } Err(error) => { error!( "Timeout while receiving response: {} (in flight: {})", diff --git a/network_utils/src/transport.rs b/network_utils/src/transport.rs index 75993a2d32cdf..ee56740d0ea2b 100644 --- a/network_utils/src/transport.rs +++ b/network_utils/src/transport.rs @@ -2,17 +2,19 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use futures::future; +use futures::{Sink, SinkExt, Stream, StreamExt}; use std::io::ErrorKind; -use std::{collections::HashMap, convert::TryInto, io, sync::Arc}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::sync::Arc; use tokio::net::TcpSocket; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::{TcpListener, TcpStream}, -}; +use tokio::net::{TcpListener, TcpStream}; + +use async_trait::async_trait; + use tracing::*; +use bytes::{Bytes, BytesMut}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + #[cfg(test)] #[path = "unit_tests/transport_tests.rs"] mod transport_tests; @@ -22,8 +24,28 @@ pub const DEFAULT_MAX_DATAGRAM_SIZE: usize = 65507; pub const DEFAULT_MAX_DATAGRAM_SIZE_STR: &str = "65507"; /// The handler required to create a service. -pub trait MessageHandler { - fn handle_message<'a>(&'a self, buffer: &'a [u8]) -> future::BoxFuture<'a, Option>>; +#[async_trait] +pub trait MessageHandler { + async fn handle_messages(&self, channel: A) -> (); +} + +/* + The RwChannel connects the low-level networking code here, that handles + TCP streams, ports, accept/connect, and sockets that provide AsyncRead / + AsyncWrite on byte streams, with the higher level logic in AuthorityServer + that handles sequences of Bytes / BytesMut, as framed messages, through + exposing a standard Stream and Sink trait. + + This separation allows us to change the details of the network, transport + and framing, without changing the authority code. It also allows us to test + the authority without using a real network. +*/ +pub trait RwChannel<'a> { + type R: 'a + Stream> + Unpin + Send; + type W: 'a + Sink + Unpin + Send; + + fn sink(&mut self) -> &mut Self::W; + fn stream(&mut self) -> &mut Self::R; } /// The result of spawning a server is oneshot channel to kill it and a handle to track completion. @@ -54,11 +76,6 @@ pub async fn connect( TcpDataStream::connect(address, max_data_size).await } -/// Create a DataStreamPool for this protocol. -pub async fn make_outgoing_connection_pool() -> Result { - TcpDataStreamPool::new().await -} - /// Run a server for this protocol and the given message handler. pub async fn spawn_server( address: &str, @@ -66,7 +83,7 @@ pub async fn spawn_server( buffer_size: usize, ) -> Result where - S: MessageHandler + Send + Sync + 'static, + S: MessageHandler + Send + Sync + 'static, { let (complete, receiver) = futures::channel::oneshot::channel(); let handle = { @@ -89,8 +106,7 @@ where /// An implementation of DataStream based on TCP. pub struct TcpDataStream { - stream: TcpStream, - max_data_size: usize, + framed: Framed, } impl TcpDataStream { @@ -103,95 +119,41 @@ impl TcpDataStream { socket.set_recv_buffer_size(max_data_size as u32)?; let stream = socket.connect(addr).await?; - Ok(Self { - stream, - max_data_size, - }) + Ok(TcpDataStream::from_tcp_stream(stream, max_data_size)) } - async fn tcp_write_data(stream: &mut S, buffer: &[u8]) -> Result<(), std::io::Error> - where - S: AsyncWrite + Unpin, - { - stream - .write_all(&u32::to_le_bytes( - buffer - .len() - .try_into() - .expect("length must not exceed u32::MAX"), - )) - .await?; - stream.write_all(buffer).await - } + fn from_tcp_stream(stream: TcpStream, max_data_size: usize) -> TcpDataStream { + let framed = Framed::new( + stream, + LengthDelimitedCodec::builder() + .max_frame_length(max_data_size) + .new_codec(), + ); - async fn tcp_read_data(stream: &mut S, max_size: usize) -> Result, std::io::Error> - where - S: AsyncRead + Unpin, - { - let mut size_buf = [0u8; 4]; - stream.read_exact(&mut size_buf).await?; - let size = u32::from_le_bytes(size_buf); - if size as usize > max_size { - return Err(io::Error::new( - io::ErrorKind::Other, - "Message size exceeds buffer size", - )); - } - let mut buf = vec![0u8; size as usize]; - stream.read_exact(&mut buf).await?; - Ok(buf) + Self { framed } } -} -impl TcpDataStream { + // TODO: Eliminate vecs and use Byte, ByteBuf + pub async fn write_data<'a>(&'a mut self, buffer: &'a [u8]) -> Result<(), std::io::Error> { - Self::tcp_write_data(&mut self.stream, buffer).await + self.framed.send(buffer.to_vec().into()).await } - pub async fn read_data(&mut self) -> Result, std::io::Error> { - Self::tcp_read_data(&mut self.stream, self.max_data_size).await + pub async fn read_data(&mut self) -> Option, std::io::Error>> { + let result = self.framed.next().await; + result.map(|v| v.map(|w| w.to_vec())) } } -/// An implementation of DataStreamPool based on TCP. -pub struct TcpDataStreamPool { - streams: HashMap, -} - -impl TcpDataStreamPool { - async fn new() -> Result { - let streams = HashMap::new(); - Ok(Self { streams }) - } +impl<'a> RwChannel<'a> for TcpDataStream { + type W = Framed; + type R = Framed; - async fn get_stream(&mut self, address: &str) -> Result<&mut TcpStream, io::Error> { - if !self.streams.contains_key(address) { - match TcpStream::connect(address).await { - Ok(s) => { - self.streams.insert(address.to_string(), s); - } - Err(error) => { - error!("Failed to open connection to {}: {}", address, error); - return Err(error); - } - }; - }; - Ok(self.streams.get_mut(address).unwrap()) + fn sink(&mut self) -> &mut Self::W { + &mut self.framed } -} - -impl TcpDataStreamPool { - pub async fn send_data_to<'a>( - &'a mut self, - buffer: &'a [u8], - address: &'a str, - ) -> Result<(), std::io::Error> { - let stream = self.get_stream(address).await?; - let result = TcpDataStream::tcp_write_data(stream, buffer).await; - if result.is_err() { - self.streams.remove(address); - } - result + fn stream(&mut self) -> &mut Self::R { + &mut self.framed } } @@ -200,44 +162,27 @@ async fn run_tcp_server( listener: TcpListener, state: S, mut exit_future: futures::channel::oneshot::Receiver<()>, - buffer_size: usize, + _buffer_size: usize, ) -> Result<(), std::io::Error> where - S: MessageHandler + Send + Sync + 'static, + S: MessageHandler + Send + Sync + 'static, { - let guarded_state = Arc::new(Box::new(state)); + let guarded_state = Arc::new(state); loop { - let (mut stream, _) = match future::select(exit_future, Box::pin(listener.accept())).await { - future::Either::Left(_) => break, - future::Either::Right((value, new_exit_future)) => { - exit_future = new_exit_future; - value? + let stream; + + tokio::select! { + _ = &mut exit_future => { break }, + result = listener.accept() => { + let (value, _addr) = result?; + stream = value; } - }; + } let guarded_state = guarded_state.clone(); tokio::spawn(async move { - loop { - let buffer = match TcpDataStream::tcp_read_data(&mut stream, buffer_size).await { - Ok(buffer) => buffer, - Err(err) => { - // We expect some EOF or disconnect error at the end. - if err.kind() != io::ErrorKind::UnexpectedEof - && err.kind() != io::ErrorKind::ConnectionReset - { - error!("Error while reading TCP stream: {}", err); - } - break; - } - }; - - if let Some(reply) = guarded_state.handle_message(&buffer[..]).await { - let status = TcpDataStream::tcp_write_data(&mut stream, &reply[..]).await; - if let Err(error) = status { - error!("Failed to send query response: {}", error); - } - }; - } + let framed = TcpDataStream::from_tcp_stream(stream, _buffer_size); + guarded_state.handle_messages(framed).await }); } Ok(()) diff --git a/network_utils/src/unit_tests/transport_tests.rs b/network_utils/src/unit_tests/transport_tests.rs index c49ddd8d604ab..a2b09a6421762 100644 --- a/network_utils/src/unit_tests/transport_tests.rs +++ b/network_utils/src/unit_tests/transport_tests.rs @@ -24,12 +24,39 @@ impl TestService { fn new(counter: Arc) -> Self { TestService { counter } } -} -impl MessageHandler for TestService { - fn handle_message<'a>(&'a self, buffer: &'a [u8]) -> future::BoxFuture<'a, Option>> { + async fn handle_one_message<'a>(&'a self, buffer: &'a [u8]) -> Option> { self.counter.fetch_add(buffer.len(), Ordering::Relaxed); - Box::pin(async move { Some(Vec::from(buffer)) }) + Some(Vec::from(buffer)) + } +} + +#[async_trait] +impl<'a, A> MessageHandler for TestService +where + A: 'static + RwChannel<'a> + Unpin + Send, +{ + async fn handle_messages(&self, mut channel: A) -> () { + loop { + let buffer = match channel.stream().next().await { + Some(Ok(buffer)) => buffer, + Some(Err(err)) => { + // We expect some EOF or disconnect error at the end. + error!("Error while reading TCP stream: {}", err); + break; + } + None => { + break; + } + }; + + if let Some(reply) = self.handle_one_message(&buffer[..]).await { + let status = channel.sink().send(reply.into()).await; + if let Err(error) = status { + error!("Failed to send query response: {}", error); + } + }; + } } } @@ -43,18 +70,15 @@ async fn test_server() -> Result<(usize, usize), std::io::Error> { let mut client = connect(address.clone(), 1000).await?; client.write_data(b"abcdef").await?; - received += client.read_data().await?.len(); + received += client.read_data().await.unwrap()?.len(); client.write_data(b"abcd").await?; - received += client.read_data().await?.len(); - - // Use a second connection (here pooled). - let mut pool = make_outgoing_connection_pool().await?; - pool.send_data_to(b"abc", &address).await?; + received += client.read_data().await.unwrap()?.len(); // Try to read data on the first connection (should fail). received += timeout(Duration::from_millis(500), client.read_data()) .await - .unwrap_or_else(|_| Ok(Vec::new()))? + .unwrap_or_else(|_| Some(Ok(Vec::new()))) + .unwrap()? .len(); // Attempt to gracefully kill server. @@ -65,7 +89,8 @@ async fn test_server() -> Result<(usize, usize), std::io::Error> { .unwrap_or(Ok(()))?; received += timeout(Duration::from_millis(500), client.read_data()) .await - .unwrap_or_else(|_| Ok(Vec::new()))? + .unwrap_or_else(|_| Some(Ok(Vec::new()))) + .unwrap()? .len(); Ok((counter.load(Ordering::Relaxed), received)) @@ -76,6 +101,6 @@ fn tcp_server() { let rt = Runtime::new().unwrap(); let (processed, received) = rt.block_on(test_server()).unwrap(); // Active TCP connections are allowed to finish before the server is gracefully killed. - assert_eq!(processed, 17); + assert_eq!(processed, 14); assert_eq!(received, 14); } diff --git a/sui_core/Cargo.toml b/sui_core/Cargo.toml index bfaf4707a69e0..80cb68e4a5e8c 100644 --- a/sui_core/Cargo.toml +++ b/sui_core/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0.55" bcs = "0.1.3" futures = "0.3.21" rand = "0.7.3" +bytes = "1.1.0" serde = { version = "1.0.136", features = ["derive"] } tokio = { version = "1.17.0", features = ["full"] } parking_lot = "0.12.0" @@ -22,7 +23,6 @@ tracing = { version = "0.1.31", features = ["log"] } signature = "1.5.0" ed25519-dalek = "1.0.1" structopt = "0.3.26" -bytes = "1.1.0" log = "0.4.14" sui-adapter = { path = "../sui_programmability/adapter" } diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index fb48be3457bca..9df2bd43f5a62 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -10,13 +10,14 @@ use move_core_types::{ }; use move_vm_runtime::native_functions::NativeFunctionTable; use std::{ - collections::{BTreeMap, BTreeSet, HashSet}, + collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, pin::Pin, sync::Arc, }; use sui_adapter::adapter; use sui_types::{ base_types::*, + batch::UpdateItem, committee::Committee, crypto::AuthoritySignature, error::{SuiError, SuiResult}, @@ -27,7 +28,7 @@ use sui_types::{ MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, }; -use crate::authority_batch::BatchSender; +use crate::authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender}; #[cfg(test)] #[path = "unit_tests/authority_tests.rs"] @@ -41,6 +42,7 @@ pub use authority_store::AuthorityStore; // based on https://github.com/diem/move/blob/62d48ce0d8f439faa83d05a4f5cd568d4bfcb325/language/tools/move-cli/src/sandbox/utils/mod.rs#L50 const MAX_GAS_BUDGET: u64 = 18446744073709551615 / 1000 - 1; +const MAX_ITEMS_LIMIT: u64 = 10_000; /// a Trait object for `signature::Signer` that is: /// - Pin, i.e. confined to one place in memory (we don't want to copy private keys). @@ -67,10 +69,11 @@ pub struct AuthorityState { /// The database _database: Arc, + // Structures needed for handling batching and notifications. /// The sender to notify of new transactions /// and create batches for this authority. /// Keep as None if there is no need for this. - batch_sender: Option, + batch_channels: Option<(BatchSender, BroadcastSender)>, } /// The authority state encapsulates all state, drives execution, and ensures safety. @@ -82,14 +85,28 @@ pub struct AuthorityState { impl AuthorityState { /// Set a listener for transaction certificate updates. Returns an /// error if a listener is already registered. - pub fn set_batch_sender(&mut self, batch_sender: BatchSender) -> SuiResult { - if self.batch_sender.is_some() { + pub fn set_batch_sender( + &mut self, + batch_sender: BatchSender, + broadcast_sender: BroadcastSender, + ) -> SuiResult { + if self.batch_channels.is_some() { return Err(SuiError::AuthorityUpdateFailure); } - self.batch_sender = Some(batch_sender); + self.batch_channels = Some((batch_sender, broadcast_sender)); Ok(()) } + /// Get a broadcast receiver for updates + pub fn subscribe(&self) -> Result { + self.batch_channels + .as_ref() + .map(|(_, tx)| tx.subscribe()) + .ok_or(SuiError::GenericAuthorityError { + error: "No broadcast subscriptions allowed for this authority.".to_string(), + }) + } + /// The logic to check one object against a reference, and return the object if all is well /// or an error if not. fn check_one_lock( @@ -415,7 +432,7 @@ impl AuthorityState { .await?; // Returns the OrderInfoResponse // If there is a notifier registered, notify: - if let Some(sender) = &self.batch_sender { + if let Some((sender, _)) = &self.batch_channels { sender.send_item(seq, transaction_digest).await?; } @@ -633,6 +650,66 @@ impl AuthorityState { }) } + /// Handles a request for a batch info. It returns a sequence of + /// [batches, transactions, batches, transactions] as UpdateItems, and a flag + /// that if true indicates the request goes beyond the last batch in the + /// database. + pub async fn handle_batch_info_request( + &self, + request: BatchInfoRequest, + ) -> Result<(VecDeque, bool), SuiError> { + // Ensure the range contains some elements and end > start + if request.end <= request.start { + return Err(SuiError::InvalidSequenceRangeError); + }; + + // Ensure we are not doing too much work per request + if request.end - request.start > MAX_ITEMS_LIMIT { + return Err(SuiError::TooManyItemsError(MAX_ITEMS_LIMIT)); + } + + let (batches, transactions) = self + ._database + .batches_and_transactions(request.start, request.end)?; + + let mut dq_batches = std::collections::VecDeque::from(batches); + let mut dq_transactions = std::collections::VecDeque::from(transactions); + let mut items = VecDeque::with_capacity(dq_batches.len() + dq_transactions.len()); + let mut last_batch_next_seq = 0; + + // Send full historical data as [Batch - Transactions - Batch - Transactions - Batch]. + while let Some(current_batch) = dq_batches.pop_front() { + // Get all transactions belonging to this batch and send them + loop { + // No more items or item too large for this batch + if dq_transactions.is_empty() + || dq_transactions[0].0 >= current_batch.batch.next_sequence_number + { + break; + } + + let current_transaction = dq_transactions.pop_front().unwrap(); + items.push_back(UpdateItem::Transaction(current_transaction)); + } + + // Now send the batch + last_batch_next_seq = current_batch.batch.next_sequence_number; + items.push_back(UpdateItem::Batch(current_batch)); + } + + // whether we have sent everything requested, or need to start + // live notifications. + let should_subscribe = request.end > last_batch_next_seq; + + // If any transactions are left they must be outside a batch + while let Some(current_transaction) = dq_transactions.pop_front() { + // Remember the last sequence sent + items.push_back(UpdateItem::Transaction(current_transaction)); + } + + Ok((items, should_subscribe)) + } + pub async fn new( committee: Committee, name: AuthorityName, @@ -651,7 +728,7 @@ impl AuthorityState { move_vm: adapter::new_move_vm(native_functions) .expect("We defined natives to not fail here"), _database: store, - batch_sender: None, + batch_channels: None, }; for genesis_modules in genesis_packages { @@ -663,11 +740,15 @@ impl AuthorityState { state } - #[cfg(test)] - pub fn db(&self) -> Arc { + pub(crate) fn db(&self) -> Arc { self._database.clone() } + #[cfg(test)] + pub(crate) fn batch_sender(&self) -> &BatchSender { + &self.batch_channels.as_ref().unwrap().0 + } + async fn get_object(&self, object_id: &ObjectID) -> Result, SuiError> { self._database.get_object(object_id) } diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index 4233f390f1ce7..50459ee2bcb89 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -9,14 +9,12 @@ use std::path::Path; use std::sync::atomic::AtomicU64; use sui_types::base_types::SequenceNumber; +use sui_types::batch::{SignedBatch, TxSequenceNumber}; use typed_store::rocks::{open_cf, DBBatch, DBMap}; use std::sync::atomic::Ordering; use typed_store::traits::Map; -pub use crate::authority_batch::AuthorityBatch; -use crate::authority_batch::{SignedBatch, TxSequenceNumber}; - pub struct AuthorityStore { /// This is a map between the object ID and the latest state of the object, namely the /// state that is needed to process new transactions. If an object is deleted its entry is @@ -635,6 +633,108 @@ impl AuthorityStore { write_batch = write_batch.insert_batch(&self.schedule, schedule_to_write)?; write_batch.write().map_err(SuiError::from) } + + /// Retrieves batches including transactions within a range. + /// + /// This function returns all signed batches that enclose the requested transaction + /// including the batch preceeding the first requested transaction, the batch including + /// the last requested transaction (if there is one) and all batches in between. + /// + /// Transactions returned include all transactions within the batch that include the + /// first requested transaction, all the way to at least all the transactions that are + /// included in the last batch returned. If the last requested transaction is outside a + /// batch (one has not yet been generated) the function returns all transactions at the + /// end of the sequence that are in TxSequenceOrder (and ignores any that are out of + /// order.) + #[allow(clippy::type_complexity)] + pub fn batches_and_transactions( + &self, + start: u64, + end: u64, + ) -> Result<(Vec, Vec<(TxSequenceNumber, TransactionDigest)>), SuiError> { + /* + Get all batches that include requested transactions. This includes the signed batch + prior to the first requested transaction, the batch including the last requested + transaction and all batches in between. + + So for example if we got a request for start: 3 end: 9 and we have: + B0 T0 T1 B2 T2 T3 B3 T3 T4 T5 B6 T6 T8 T9 + + This will return B2, B3, B6 + + + */ + let batches: Vec = self + .batches + .iter() + .skip_prior_to(&start)? + .take_while(|(_seq, batch)| batch.batch.initial_sequence_number < end) + .map(|(_, batch)| batch) + .collect(); + + /* + Get transactions in the retrieved batches. The first batch is included + without transactions, so get transactions of all subsequent batches, or + until the end of the sequence if the last batch does not contain the + requested end sequence number. + + So for example if we got a request for start: 3 end: 9 and we have: + B0 T0 T1 B2 T2 T3 B3 T3 T4 T5 B6 T6 T8 T9 + + The code below will return T2 .. T6 + + Note: T8 is out of order so the sequence returned ends at T6. + + */ + + let first_seq = batches + .first() + .ok_or(SuiError::NoBatchesFoundError)? + .batch + .next_sequence_number; + let mut last_seq = batches + .last() + .unwrap() // if the first exists the last exists too + .batch + .next_sequence_number; + + let mut in_sequence = last_seq; + let in_sequence_ptr = &mut in_sequence; + + if last_seq < end { + // This means that the request needs items beyond the end of the + // last batch, so we include all items. + last_seq = TxSequenceNumber::MAX; + } + + /* Since the database writes are asynchronous it may be the case that the tail end of the + sequence misses items. This will confuse calling logic, so we filter them out and allow + callers to use the subscription API to catch the latest items in order. */ + + let transactions: Vec<(TxSequenceNumber, TransactionDigest)> = self + .executed_sequence + .iter() + .skip_to(&first_seq)? + .take_while(|(seq, _tx)| { + // Before the end of the last batch we want everything. + if *seq < *in_sequence_ptr { + return true; + }; + + // After the end of the last batch we only take items in sequence. + if *seq < last_seq && *seq == *in_sequence_ptr { + *in_sequence_ptr += 1; + return true; + } + + // If too large or out of sequence after the last batch + // we stop taking items. + false + }) + .collect(); + + Ok((batches, transactions)) + } } impl ModuleResolver for AuthorityStore { diff --git a/sui_core/src/authority_batch.rs b/sui_core/src/authority_batch.rs index d65c70c390ff1..927a23c2ea093 100644 --- a/sui_core/src/authority_batch.rs +++ b/sui_core/src/authority_batch.rs @@ -2,14 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use crate::authority::{AuthorityStore, StableSyncAuthoritySigner}; -use serde::{Deserialize, Serialize}; use std::sync::Arc; use sui_types::base_types::*; +use sui_types::batch::*; use sui_types::error::{SuiError, SuiResult}; use std::collections::BTreeMap; use std::time::Duration; -use sui_types::crypto::{sha3_hash, AuthoritySignature, BcsSignable}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::interval; @@ -41,19 +40,10 @@ The architecture is as follows: */ -pub type TxSequenceNumber = u64; +pub type BroadcastSender = tokio::sync::broadcast::Sender; +pub type BroadcastReceiver = tokio::sync::broadcast::Receiver; -pub type BroadcastPair = ( - tokio::sync::broadcast::Sender, - tokio::sync::broadcast::Receiver, -); - -/// Either a freshly sequenced transaction hash or a batch -#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum UpdateItem { - Transaction((TxSequenceNumber, TransactionDigest)), - Batch(SignedBatch), -} +pub type BroadcastPair = (BroadcastSender, BroadcastReceiver); pub struct BatchSender { /// Channel for sending updates. @@ -64,7 +54,7 @@ pub struct BatchManager { /// Channel for receiving updates tx_recv: Receiver<(TxSequenceNumber, TransactionDigest)>, /// The sender end of the broadcast channel used to send updates to listeners - tx_broadcast: tokio::sync::broadcast::Sender, + tx_broadcast: BroadcastSender, /// Copy of the database to write batches and read transactions. db: Arc, } @@ -276,103 +266,3 @@ impl BatchManager { /// updates to clients. pub fn register_listener() {} } - -pub type BatchDigest = [u8; 32]; - -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)] -pub struct TransactionBatch(Vec<(TxSequenceNumber, TransactionDigest)>); -impl BcsSignable for TransactionBatch {} - -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)] -pub struct AuthorityBatch { - // TODO: Add epoch - /// The next sequence number after the end of this batch - next_sequence_number: u64, - - /// The first sequence number of this batch - initial_sequence_number: u64, - - // The number of items in the batch - size: u64, - - /// The digest of the previous block, if there is one - previous_digest: Option, - - // The digest of all transactions digests in this batch - transactions_digest: [u8; 32], -} - -impl BcsSignable for AuthorityBatch {} - -impl AuthorityBatch { - pub fn digest(&self) -> BatchDigest { - sha3_hash(self) - } - - /// The first batch for any authority indexes at zero - /// and has zero length. - pub fn initial() -> AuthorityBatch { - let to_hash = TransactionBatch(Vec::new()); - let transactions_digest = sha3_hash(&to_hash); - - AuthorityBatch { - next_sequence_number: 0, - initial_sequence_number: 0, - size: 0, - previous_digest: None, - transactions_digest, - } - } - - /// Make a batch, containing some transactions, and following the previous - /// batch. - pub fn make_next( - previous_batch: &AuthorityBatch, - transactions: &[(TxSequenceNumber, TransactionDigest)], - ) -> AuthorityBatch { - let transaction_vec = transactions.to_vec(); - debug_assert!(!transaction_vec.is_empty()); - - let initial_sequence_number = transaction_vec[0].0 as u64; - let next_sequence_number = (transaction_vec[transaction_vec.len() - 1].0 + 1) as u64; - - let to_hash = TransactionBatch(transaction_vec); - let transactions_digest = sha3_hash(&to_hash); - - AuthorityBatch { - next_sequence_number, - initial_sequence_number, - size: transactions.len() as u64, - previous_digest: Some(previous_batch.digest()), - transactions_digest, - } - } -} - -/// An transaction signed by a single authority -#[derive(Eq, Clone, Debug, Serialize, Deserialize)] -pub struct SignedBatch { - pub batch: AuthorityBatch, - pub authority: AuthorityName, - pub signature: AuthoritySignature, -} - -impl SignedBatch { - pub fn new( - batch: AuthorityBatch, - secret: &dyn signature::Signer, - authority: AuthorityName, - ) -> SignedBatch { - SignedBatch { - signature: AuthoritySignature::new(&batch, secret), - batch, - authority, - } - } -} - -impl PartialEq for SignedBatch { - fn eq(&self, other: &Self) -> bool { - self.batch == other.batch && self.authority == other.authority - } -} diff --git a/sui_core/src/authority_server.rs b/sui_core/src/authority_server.rs index fd515013c90e1..3e5d79d0d138f 100644 --- a/sui_core/src/authority_server.rs +++ b/sui_core/src/authority_server.rs @@ -6,11 +6,24 @@ use crate::authority::AuthorityState; use std::io; use sui_network::{ network::NetworkServer, - transport::{spawn_server, MessageHandler, SpawnedServer}, + transport::{spawn_server, MessageHandler, RwChannel, SpawnedServer}, }; -use sui_types::{error::*, messages::*, serialize::*}; +use sui_types::{batch::UpdateItem, error::*, messages::*, serialize::*}; + +use crate::authority_batch::BatchManager; +use futures::{SinkExt, StreamExt}; + +use std::time::Duration; use tracing::*; +use async_trait::async_trait; +use bytes::Bytes; +use tokio::sync::broadcast::error::RecvError; + +#[cfg(test)] +#[path = "unit_tests/server_tests.rs"] +mod server_tests; + pub struct AuthorityServer { server: NetworkServer, pub state: AuthorityState, @@ -29,6 +42,30 @@ impl AuthorityServer { } } + /// Create a batch subsystem, register it with the authority state, and + /// launch a task that manages it. Return the join handle of this task. + pub async fn spawn_batch_subsystem( + &mut self, + min_batch_size: u64, + max_delay: Duration, + ) -> Result, SuiError> { + // Start the batching subsystem, and register the handles with the authority. + let (tx_sender, manager, (batch_sender, _batch_receiver)) = + BatchManager::new(self.state.db(), 1000); + + let _batch_join_handle = manager + .start_service( + self.state.name, + self.state.secret.clone(), + min_batch_size, + max_delay, + ) + .await?; + self.state.set_batch_sender(tx_sender, batch_sender)?; + + Ok(_batch_join_handle) + } + pub async fn spawn(self) -> Result { let address = format!("{}:{}", self.server.base_address, self.server.base_port); let buffer_size = self.server.buffer_size; @@ -36,79 +73,196 @@ impl AuthorityServer { // Launch server for the appropriate protocol. spawn_server(&address, self, buffer_size).await } -} -impl MessageHandler for AuthorityServer { - fn handle_message<'a>( + async fn handle_batch_streaming<'a, 'b, A>( + &'a self, + request: BatchInfoRequest, + channel: &mut A, + ) -> Result<(), SuiError> + where + A: RwChannel<'b>, + { + // Register a subscriber to not miss any updates + let mut subscriber = self.state.subscribe()?; + let message_end = request.end; + + // Get the historical data requested + let (mut items, should_subscribe) = self.state.handle_batch_info_request(request).await?; + + let mut last_seq_sent = 0; + while let Some(item) = items.pop_front() { + // Remember the last transaction sequence number sent + if let UpdateItem::Transaction((seq, _)) = &item { + last_seq_sent = *seq; + } + + // Send all items back to the client + let item = serialize_batch_item(&BatchInfoResponseItem(item)); + channel + .sink() + .send(Bytes::from(item)) + .await + .map_err(|_| SuiError::CannotSendClientMessageError)?; + } + + // No need to send live events. + if !should_subscribe { + return Ok(()); + } + + // Now we read from the live updates. + loop { + match subscriber.recv().await { + Ok(item) => { + let seq = match &item { + UpdateItem::Transaction((seq, _)) => *seq, + UpdateItem::Batch(signed_batch) => signed_batch.batch.next_sequence_number, + }; + + // Do not re-send transactions already sent from the database + if seq <= last_seq_sent { + continue; + } + + let response = BatchInfoResponseItem(item); + + // Send back the item from the subscription + let resp = serialize_batch_item(&response); + channel + .sink() + .send(Bytes::from(resp)) + .await + .map_err(|_| SuiError::CannotSendClientMessageError)?; + + // We always stop sending at batch boundaries, so that we try to always + // start with a batch and end with a batch to allow signature verification. + if let BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) = &response { + if message_end < signed_batch.batch.next_sequence_number { + break; + } + } + } + Err(RecvError::Closed) => { + // The service closed the channel, so we tell the client. + return Err(SuiError::SubscriptionServiceClosed); + } + Err(RecvError::Lagged(number_skipped)) => { + // We tell the client they are too slow to consume, and + // stop. + return Err(SuiError::SubscriptionItemsDropedError(number_skipped)); + } + } + } + + Ok(()) + } + + async fn handle_one_message<'a, 'b, A>( &'a self, buffer: &'a [u8], - ) -> futures::future::BoxFuture<'a, Option>> { - Box::pin(async move { - let result = deserialize_message(buffer); - let reply = match result { - Err(_) => Err(SuiError::InvalidDecoding), - Ok(result) => { - match result { - SerializedMessage::Transaction(message) => self + channel: &mut A, + ) -> Option> + where + A: RwChannel<'b>, + { + let result = deserialize_message(buffer); + let reply = match result { + Err(_) => Err(SuiError::InvalidDecoding), + Ok(result) => { + match result { + SerializedMessage::Transaction(message) => self + .state + .handle_transaction(*message) + .await + .map(|info| Some(serialize_transaction_info(&info))), + SerializedMessage::Cert(message) => { + let confirmation_transaction = ConfirmationTransaction { + certificate: message.as_ref().clone(), + }; + match self .state - .handle_transaction(*message) + .handle_confirmation_transaction(confirmation_transaction) .await - .map(|info| Some(serialize_transaction_info(&info))), - SerializedMessage::Cert(message) => { - let confirmation_transaction = ConfirmationTransaction { - certificate: message.as_ref().clone(), - }; - match self - .state - .handle_confirmation_transaction(confirmation_transaction) - .await - { - Ok(info) => { - // Response - Ok(Some(serialize_transaction_info(&info))) - } - Err(error) => Err(error), + { + Ok(info) => { + // Response + Ok(Some(serialize_transaction_info(&info))) } + Err(error) => Err(error), } - SerializedMessage::AccountInfoReq(message) => self - .state - .handle_account_info_request(*message) - .await - .map(|info| Some(serialize_account_info_response(&info))), - SerializedMessage::ObjectInfoReq(message) => self - .state - .handle_object_info_request(*message) - .await - .map(|info| Some(serialize_object_info_response(&info))), - SerializedMessage::TransactionInfoReq(message) => self - .state - .handle_transaction_info_request(*message) - .await - .map(|info| Some(serialize_transaction_info(&info))), - _ => Err(SuiError::UnexpectedMessage), } + SerializedMessage::AccountInfoReq(message) => self + .state + .handle_account_info_request(*message) + .await + .map(|info| Some(serialize_account_info_response(&info))), + SerializedMessage::ObjectInfoReq(message) => self + .state + .handle_object_info_request(*message) + .await + .map(|info| Some(serialize_object_info_response(&info))), + SerializedMessage::TransactionInfoReq(message) => self + .state + .handle_transaction_info_request(*message) + .await + .map(|info| Some(serialize_transaction_info(&info))), + SerializedMessage::BatchInfoReq(message) => self + .handle_batch_streaming(*message, channel) + .await + .map(|_| None), + + _ => Err(SuiError::UnexpectedMessage), } - }; + } + }; + + self.server.increment_packets_processed(); - self.server.increment_packets_processed(); + if self.server.packets_processed() % 5000 == 0 { + info!( + "{}:{} has processed {} packets", + self.server.base_address, + self.server.base_port, + self.server.packets_processed() + ); + } - if self.server.packets_processed() % 5000 == 0 { - info!( - "{}:{} has processed {} packets", - self.server.base_address, - self.server.base_port, - self.server.packets_processed() - ); + match reply { + Ok(x) => x, + Err(error) => { + warn!("User query failed: {}", error); + self.server.increment_user_errors(); + Some(serialize_error(&error)) } + } + } +} - match reply { - Ok(x) => x, - Err(error) => { - warn!("User query failed: {}", error); - self.server.increment_user_errors(); - Some(serialize_error(&error)) +#[async_trait] +impl<'a, A> MessageHandler for AuthorityServer +where + A: 'static + RwChannel<'a> + Unpin + Send, +{ + async fn handle_messages(&self, mut channel: A) -> () { + loop { + let buffer = match channel.stream().next().await { + Some(Ok(buffer)) => buffer, + Some(Err(err)) => { + // We expect some EOF or disconnect error at the end. + error!("Error while reading TCP stream: {}", err); + break; } - } - }) + None => { + break; + } + }; + + if let Some(reply) = self.handle_one_message(&buffer[..], &mut channel).await { + let status = channel.sink().send(reply.into()).await; + if let Err(error) = status { + error!("Failed to send query response: {}", error); + } + }; + } } } diff --git a/sui_core/src/generate_format.rs b/sui_core/src/generate_format.rs index 9a2d423609f21..55e9398774ad2 100644 --- a/sui_core/src/generate_format.rs +++ b/sui_core/src/generate_format.rs @@ -12,6 +12,7 @@ use std::{fs::File, io::Write}; use structopt::{clap::arg_enum, StructOpt}; use sui_types::{ base_types::{self, ObjectDigest, ObjectID, TransactionDigest}, + batch::UpdateItem, crypto::{get_key_pair, AuthoritySignature, Signature}, error::SuiError, messages::{ExecutionStatus, ObjectInfoRequestKind, TransactionKind}, @@ -64,6 +65,7 @@ fn get_registry() -> Result { tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; + tracer.trace_type::(&samples)?; // The final and main entry point that we must document tracer.trace_type::(&samples)?; diff --git a/sui_core/src/unit_tests/authority_tests.rs b/sui_core/src/unit_tests/authority_tests.rs index 012dd6e065a4d..1cd2cba526078 100644 --- a/sui_core/src/unit_tests/authority_tests.rs +++ b/sui_core/src/unit_tests/authority_tests.rs @@ -1812,7 +1812,7 @@ pub async fn init_state_with_objects>(objects: I) } #[cfg(test)] -async fn init_state_with_object_id(address: SuiAddress, object: ObjectID) -> AuthorityState { +pub async fn init_state_with_object_id(address: SuiAddress, object: ObjectID) -> AuthorityState { init_state_with_ids(std::iter::once((address, object))).await } diff --git a/sui_core/src/unit_tests/batch_tests.rs b/sui_core/src/unit_tests/batch_tests.rs index ccbf47197f186..05777ade74813 100644 --- a/sui_core/src/unit_tests/batch_tests.rs +++ b/sui_core/src/unit_tests/batch_tests.rs @@ -246,8 +246,11 @@ async fn test_handle_move_order_with_batch() { .await .expect("No issues starting service."); + // Check we can subscribe + let mut rx = _pair.0.subscribe(); + authority_state - .set_batch_sender(_send) + .set_batch_sender(_send, _pair.0) .expect("No problem registering"); tokio::task::yield_now().await; @@ -260,8 +263,6 @@ async fn test_handle_move_order_with_batch() { .await .unwrap(); - let (_tx, mut rx) = _pair; - // Second and after is the one let y = rx.recv().await.unwrap(); println!("{:?}", y); @@ -272,8 +273,129 @@ async fn test_handle_move_order_with_batch() { assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_))); - drop(_tx); drop(authority_state); _join.await.expect("No issues ending task."); } + +#[tokio::test] +async fn test_batch_store_retrieval() { + // Create a random directory to store the DB + let dir = env::temp_dir(); + let path = dir.join(format!("DB_{:?}", ObjectID::random())); + fs::create_dir(&path).unwrap(); + + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + // Make a test key pair + let (_, key_pair) = get_key_pair(); + let key_pair = Arc::pin(key_pair); + let address = *key_pair.public_key_bytes(); + + // TEST 1: init from an empty database should return to a zero block + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let _join = manager + .start_service(address, key_pair, 10, Duration::from_secs(60)) + .await + .expect("Start service with no issues."); + + // Send transactions out of order + let tx_zero = TransactionDigest::new([0; 32]); + + let inner_store = store.clone(); + for i in 0u64..105 { + inner_store + .executed_sequence + .insert(&i, &tx_zero) + .expect("Failed to write."); + + _send + .send_item(i, tx_zero) + .await + .expect("Send to the channel."); + } + + // Add a few out of order transactions that should be ignored + // NOTE: gap between 104 and 110 + for i in 110u64..120 { + inner_store + .executed_sequence + .insert(&i, &tx_zero) + .expect("Failed to write."); + + _send + .send_item(i, tx_zero) + .await + .expect("Send to the channel."); + } + + // TEST 1: Get batches across boundaries + + let (batches, transactions) = store + .batches_and_transactions(12, 34) + .expect("Retrieval failed!"); + + assert_eq!(4, batches.len()); + assert_eq!(10, batches.first().unwrap().batch.next_sequence_number); + assert_eq!(40, batches.last().unwrap().batch.next_sequence_number); + + assert_eq!(30, transactions.len()); + + // TEST 2: Get with range wihin batch + let (batches, transactions) = store + .batches_and_transactions(54, 56) + .expect("Retrieval failed!"); + + assert_eq!(2, batches.len()); + assert_eq!(50, batches.first().unwrap().batch.next_sequence_number); + assert_eq!(60, batches.last().unwrap().batch.next_sequence_number); + + assert_eq!(10, transactions.len()); + + // TEST 3: Get on boundary + let (batches, transactions) = store + .batches_and_transactions(30, 50) + .expect("Retrieval failed!"); + + println!("{:?}", batches); + + assert_eq!(3, batches.len()); + assert_eq!(30, batches.first().unwrap().batch.next_sequence_number); + assert_eq!(50, batches.last().unwrap().batch.next_sequence_number); + + assert_eq!(20, transactions.len()); + + // TEST 4: Get past the end + let (batches, transactions) = store + .batches_and_transactions(94, 120) + .expect("Retrieval failed!"); + + println!("{:?}", batches); + + assert_eq!(2, batches.len()); + assert_eq!(90, batches.first().unwrap().batch.next_sequence_number); + assert_eq!(100, batches.last().unwrap().batch.next_sequence_number); + + assert_eq!(15, transactions.len()); + + // TEST 5: Both past the end + let (batches, transactions) = store + .batches_and_transactions(123, 222) + .expect("Retrieval failed!"); + + println!("{:?}", batches); + + assert_eq!(1, batches.len()); + assert_eq!(100, batches.first().unwrap().batch.next_sequence_number); + + assert_eq!(5, transactions.len()); + + // When we close the sending channel we also also end the service task + drop(_send); + drop(_pair); + + _join.await.expect("No errors in task"); +} diff --git a/sui_core/src/unit_tests/server_tests.rs b/sui_core/src/unit_tests/server_tests.rs new file mode 100644 index 0000000000000..919a94da6b559 --- /dev/null +++ b/sui_core/src/unit_tests/server_tests.rs @@ -0,0 +1,242 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use crate::authority::authority_tests::init_state_with_object_id; +use sui_types::base_types::{dbg_addr, dbg_object_id, TransactionDigest}; +use sui_types::object::ObjectFormatOptions; +use sui_types::serialize::{deserialize_message, serialize_object_info_request}; +use typed_store::Map; + +use super::*; + +#[tokio::test] +async fn test_start_stop_batch_subsystem() { + let sender = dbg_addr(1); + let object_id = dbg_object_id(1); + let authority_state = init_state_with_object_id(sender, object_id).await; + + let mut server = AuthorityServer::new("127.0.0.1".to_string(), 999, 65000, authority_state); + let join = server + .spawn_batch_subsystem(1000, Duration::from_secs(5)) + .await + .expect("Problem launching subsystem."); + + // Now drop the server to simulate the authority server ending processing. + drop(server); + + // This should return immediately. + join.await.expect("Error stoping subsystem"); +} + +// Some infra to feed the server messages and receive responses. + +use bytes::{Bytes, BytesMut}; +use futures::channel::mpsc::{channel, Receiver, Sender}; +use futures::sink::SinkMapErr; +use futures::{Sink, SinkExt}; + +type SinkSenderErr = + SinkMapErr, fn( as Sink>::Error) -> std::io::Error>; + +struct TestChannel { + reader: Receiver>, + writer: SinkSenderErr, +} + +#[allow(clippy::type_complexity)] // appease clippy, in the tests! +impl TestChannel { + pub fn new() -> ( + TestChannel, + (Sender>, Receiver), + ) { + let (outer_tx, inner_rx) = channel(1000); + let (inner_tx, outer_rx) = channel(1000); + + let test_channel = TestChannel { + reader: inner_rx, + writer: inner_tx + .sink_map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "SOme error!")), + }; + + (test_channel, (outer_tx, outer_rx)) + } +} + +impl<'a> RwChannel<'a> for TestChannel { + type R = Receiver>; + type W = SinkSenderErr; + + fn sink(&mut self) -> &mut Self::W { + &mut self.writer + } + fn stream(&mut self) -> &mut Self::R { + &mut self.reader + } +} + +//This is the most basic example of how to test the server logic + +#[tokio::test] +async fn test_channel_infra() { + let sender = dbg_addr(1); + let object_id = dbg_object_id(1); + let authority_state = init_state_with_object_id(sender, object_id).await; + + let server = Arc::new(AuthorityServer::new( + "127.0.0.1".to_string(), + 999, + 65000, + authority_state, + )); + + let (channel, (mut tx, mut rx)) = TestChannel::new(); + + let handle = tokio::spawn(async move { + server.handle_messages(channel).await; + }); + + let req = ObjectInfoRequest::latest_object_info_request( + object_id, + Some(ObjectFormatOptions::default()), + ); + + let bytes: BytesMut = BytesMut::from(&serialize_object_info_request(&req)[..]); + tx.send(Ok(bytes)).await.expect("Problem sending"); + let resp = rx.next().await; + assert!(!resp.unwrap().is_empty()); + + drop(tx); + handle.await.expect("Problem closing task"); +} + +#[tokio::test] +async fn test_subscription() { + let sender = dbg_addr(1); + let object_id = dbg_object_id(1); + let authority_state = init_state_with_object_id(sender, object_id).await; + + // Start the batch server + let mut server = AuthorityServer::new("127.0.0.1".to_string(), 998, 65000, authority_state); + + let db = server.state.db().clone(); + let db2 = server.state.db().clone(); + + let _join = server + .spawn_batch_subsystem(10, Duration::from_secs(500)) + .await + .expect("Problem launching subsystem."); + + let tx_zero = TransactionDigest::new([0; 32]); + for i in 0u64..105 { + db.executed_sequence + .insert(&i, &tx_zero) + .expect("Failed to write."); + + server + .state + .batch_sender() + .send_item(i, tx_zero) + .await + .expect("Send to the channel."); + } + + let (channel, (mut tx, mut rx)) = TestChannel::new(); + + let server = Arc::new(server); + + let inner_server1 = server.clone(); + let handle1 = tokio::spawn(async move { + inner_server1.handle_messages(channel).await; + }); + + // TEST 1: Get historical data + + let req = BatchInfoRequest { start: 12, end: 34 }; + + let bytes: BytesMut = BytesMut::from(&serialize_batch_request(&req)[..]); + tx.send(Ok(bytes)).await.expect("Problem sending"); + + let mut num_batches = 0; + let mut num_transactions = 0; + + while let Some(data) = rx.next().await { + match deserialize_message(&data[..]).expect("Bad response") { + SerializedMessage::BatchInfoResp(resp) => match *resp { + BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { + num_batches += 1; + if signed_batch.batch.next_sequence_number >= 34 { + break; + } + } + BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))) => { + num_transactions += 1; + } + }, + _ => { + panic!("Bad response"); + } + } + } + + assert_eq!(4, num_batches); + assert_eq!(30, num_transactions); + + // Test 2: Get subscription data + + // Add data in real time + let inner_server2 = server.clone(); + let _handle2 = tokio::spawn(async move { + for i in 105..150 { + tokio::time::sleep(Duration::from_millis(20)).await; + db2.executed_sequence + .insert(&i, &tx_zero) + .expect("Failed to write."); + println!("Send item {}", i); + inner_server2 + .state + .batch_sender() + .send_item(i, tx_zero) + .await + .expect("Send to the channel."); + } + }); + + let req = BatchInfoRequest { + start: 101, + end: 112, + }; + + let bytes: BytesMut = BytesMut::from(&serialize_batch_request(&req)[..]); + tx.send(Ok(bytes)).await.expect("Problem sending"); + + let mut num_batches = 0; + let mut num_transactions = 0; + + while let Some(data) = rx.next().await { + match deserialize_message(&data[..]).expect("Bad response") { + SerializedMessage::BatchInfoResp(resp) => match *resp { + BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) => { + num_batches += 1; + if signed_batch.batch.next_sequence_number >= 112 { + break; + } + } + BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest))) => { + println!("Received {}", seq); + num_transactions += 1; + } + }, + _ => { + panic!("Bad response"); + } + } + } + + assert_eq!(3, num_batches); + assert_eq!(20, num_transactions); + + drop(tx); + handle1.await.expect("Problem closing task"); +} diff --git a/sui_core/tests/format.rs b/sui_core/tests/format.rs index 2853bdd1b22cd..8d0a0bc51110a 100644 --- a/sui_core/tests/format.rs +++ b/sui_core/tests/format.rs @@ -4,6 +4,9 @@ #[test] fn test_format() { + // If this test breaks and you intended a format change, you need to run to get the fresh format: + // # cargo -q run --example generate-format -- print > sui_core/tests/staged/sui.yaml + let status = std::process::Command::new("cargo") .current_dir("..") .args(&["run", "--example", "generate-format", "--"]) diff --git a/sui_core/tests/staged/sui.yaml b/sui_core/tests/staged/sui.yaml index c4bc9a131b60b..4304e0cf1744c 100644 --- a/sui_core/tests/staged/sui.yaml +++ b/sui_core/tests/staged/sui.yaml @@ -18,11 +18,32 @@ AccountInfoResponse: - TYPENAME: ObjectDigest - owner: TYPENAME: SuiAddress +AuthorityBatch: + STRUCT: + - next_sequence_number: U64 + - initial_sequence_number: U64 + - size: U64 + - previous_digest: + OPTION: + TUPLEARRAY: + CONTENT: U8 + SIZE: 32 + - transactions_digest: + TUPLEARRAY: + CONTENT: U8 + SIZE: 32 AuthoritySignature: NEWTYPESTRUCT: TUPLEARRAY: CONTENT: U8 SIZE: 64 +BatchInfoRequest: + STRUCT: + - start: U64 + - end: U64 +BatchInfoResponseItem: + NEWTYPESTRUCT: + TYPENAME: UpdateItem CertifiedTransaction: STRUCT: - transaction: @@ -270,8 +291,24 @@ SerializedMessage: TransactionInfoReq: NEWTYPE: TYPENAME: TransactionInfoRequest + 10: + BatchInfoReq: + NEWTYPE: + TYPENAME: BatchInfoRequest + 11: + BatchInfoResp: + NEWTYPE: + TYPENAME: BatchInfoResponseItem Signature: NEWTYPESTRUCT: BYTES +SignedBatch: + STRUCT: + - batch: + TYPENAME: AuthorityBatch + - authority: + TYPENAME: PublicKeyBytes + - signature: + TYPENAME: AuthoritySignature SignedTransaction: STRUCT: - transaction: @@ -431,82 +468,98 @@ SuiError: STRUCT: - error: STR 39: + TransferImmutableError: UNIT + 40: + TooManyItemsError: + NEWTYPE: U64 + 41: + InvalidSequenceRangeError: UNIT + 42: + NoBatchesFoundError: UNIT + 43: + CannotSendClientMessageError: UNIT + 44: + SubscriptionItemsDropedError: + NEWTYPE: U64 + 45: + SubscriptionServiceClosed: UNIT + 46: ModuleLoadFailure: STRUCT: - error: STR - 40: + 47: ModuleVerificationFailure: STRUCT: - error: STR - 41: + 48: ModuleDeserializationFailure: STRUCT: - error: STR - 42: + 49: ModulePublishFailure: STRUCT: - error: STR - 43: + 50: ModuleBuildFailure: STRUCT: - error: STR - 44: + 51: DependentPackageNotFound: STRUCT: - package_id: TYPENAME: ObjectID - 45: + 52: MoveUnitTestFailure: STRUCT: - error: STR - 46: + 53: FunctionNotFound: STRUCT: - error: STR - 47: + 54: ModuleNotFound: STRUCT: - module_name: STR - 48: + 55: InvalidFunctionSignature: STRUCT: - error: STR - 49: + 56: TypeError: STRUCT: - error: STR - 50: + 57: AbortedExecution: STRUCT: - error: STR - 51: + 58: InvalidMoveEvent: STRUCT: - error: STR - 52: + 59: CircularObjectOwnership: UNIT - 53: + 60: GasBudgetTooHigh: STRUCT: - error: STR - 54: + 61: InsufficientGas: STRUCT: - error: STR - 55: + 62: InvalidTxUpdate: UNIT - 56: + 63: TransactionLockExists: UNIT - 57: + 64: TransactionLockDoesNotExist: UNIT - 58: + 65: TransactionLockReset: UNIT - 59: + 66: ObjectNotFound: STRUCT: - object_id: TYPENAME: ObjectID - 60: + 67: ObjectDeleted: STRUCT: - object_ref: @@ -514,26 +567,26 @@ SuiError: - TYPENAME: ObjectID - TYPENAME: SequenceNumber - TYPENAME: ObjectDigest - 61: + 68: BadObjectType: STRUCT: - error: STR - 62: + 69: MoveExecutionFailure: UNIT - 63: + 70: ObjectInputArityViolation: UNIT - 64: + 71: ExecutionInvariantViolation: UNIT - 65: + 72: AuthorityInformationUnavailable: UNIT - 66: + 73: AuthorityUpdateFailure: UNIT - 67: + 74: ByzantineAuthoritySuspicion: STRUCT: - authority: TYPENAME: PublicKeyBytes - 68: + 75: PairwiseSyncFailed: STRUCT: - xsource: @@ -544,29 +597,33 @@ SuiError: TYPENAME: TransactionDigest - error: TYPENAME: SuiError - 69: + 76: StorageError: NEWTYPE: TYPENAME: TypedStoreError - 70: + 77: BatchErrorSender: UNIT - 71: + 78: + GenericAuthorityError: + STRUCT: + - error: STR + 79: QuorumNotReached: STRUCT: - errors: SEQ: TYPENAME: SuiError - 72: + 80: ObjectSerializationError: UNIT - 73: + 81: ConcurrentTransactionError: UNIT - 74: + 82: IncorrectRecipientError: UNIT - 75: + 83: TooManyIncorrectAuthorities: UNIT - 76: + 84: IncorrectGasSplit: UNIT - 77: + 85: IncorrectGasMerge: UNIT Transaction: STRUCT: @@ -715,4 +772,16 @@ TypedStoreError: NEWTYPE: STR 3: CrossDBBatch: UNIT +UpdateItem: + ENUM: + 0: + Transaction: + NEWTYPE: + TUPLE: + - U64 + - TYPENAME: TransactionDigest + 1: + Batch: + NEWTYPE: + TYPENAME: SignedBatch diff --git a/sui_types/src/batch.rs b/sui_types/src/batch.rs new file mode 100644 index 0000000000000..f4eff4e1f70db --- /dev/null +++ b/sui_types/src/batch.rs @@ -0,0 +1,115 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::base_types::{AuthorityName, TransactionDigest}; +use crate::crypto::{sha3_hash, AuthoritySignature, BcsSignable}; +use serde::{Deserialize, Serialize}; + +pub type TxSequenceNumber = u64; + +/// Either a freshly sequenced transaction hash or a batch +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum UpdateItem { + Transaction((TxSequenceNumber, TransactionDigest)), + Batch(SignedBatch), +} + +pub type BatchDigest = [u8; 32]; + +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)] +pub struct TransactionBatch(Vec<(TxSequenceNumber, TransactionDigest)>); +impl BcsSignable for TransactionBatch {} + +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Default, Debug, Serialize, Deserialize)] +pub struct AuthorityBatch { + // TODO: Add epoch + /// The next sequence number after the end of this batch + pub next_sequence_number: u64, + + /// The first sequence number of this batch + pub initial_sequence_number: u64, + + // The number of items in the batch + pub size: u64, + + /// The digest of the previous block, if there is one + pub previous_digest: Option, + + // The digest of all transactions digests in this batch + pub transactions_digest: [u8; 32], +} + +impl BcsSignable for AuthorityBatch {} + +impl AuthorityBatch { + pub fn digest(&self) -> BatchDigest { + sha3_hash(self) + } + + /// The first batch for any authority indexes at zero + /// and has zero length. + pub fn initial() -> AuthorityBatch { + let to_hash = TransactionBatch(Vec::new()); + let transactions_digest = sha3_hash(&to_hash); + + AuthorityBatch { + next_sequence_number: 0, + initial_sequence_number: 0, + size: 0, + previous_digest: None, + transactions_digest, + } + } + + /// Make a batch, containing some transactions, and following the previous + /// batch. + pub fn make_next( + previous_batch: &AuthorityBatch, + transactions: &[(TxSequenceNumber, TransactionDigest)], + ) -> AuthorityBatch { + let transaction_vec = transactions.to_vec(); + debug_assert!(!transaction_vec.is_empty()); + + let initial_sequence_number = transaction_vec[0].0 as u64; + let next_sequence_number = (transaction_vec[transaction_vec.len() - 1].0 + 1) as u64; + + let to_hash = TransactionBatch(transaction_vec); + let transactions_digest = sha3_hash(&to_hash); + + AuthorityBatch { + next_sequence_number, + initial_sequence_number, + size: transactions.len() as u64, + previous_digest: Some(previous_batch.digest()), + transactions_digest, + } + } +} + +/// An transaction signed by a single authority +#[derive(Eq, Clone, Debug, Serialize, Deserialize)] +pub struct SignedBatch { + pub batch: AuthorityBatch, + pub authority: AuthorityName, + pub signature: AuthoritySignature, +} + +impl SignedBatch { + pub fn new( + batch: AuthorityBatch, + secret: &dyn signature::Signer, + authority: AuthorityName, + ) -> SignedBatch { + SignedBatch { + signature: AuthoritySignature::new(&batch, secret), + batch, + authority, + } + } +} + +impl PartialEq for SignedBatch { + fn eq(&self, other: &Self) -> bool { + self.batch == other.batch && self.authority == other.authority + } +} diff --git a/sui_types/src/error.rs b/sui_types/src/error.rs index a561b49266cf0..33c4c5e53c315 100644 --- a/sui_types/src/error.rs +++ b/sui_types/src/error.rs @@ -134,6 +134,22 @@ pub enum SuiError { DuplicateObjectRefInput, #[error("Network error while querying service: {:?}.", error)] ClientIoError { error: String }, + #[error("Cannot transfer immutable object.")] + TransferImmutableError, + + // Errors related to batches + #[error("The number of items requested exceeds defined limits of {0}.")] + TooManyItemsError(u64), + #[error("The range specified is invalid.")] + InvalidSequenceRangeError, + #[error("No batches mached the range requested.")] + NoBatchesFoundError, + #[error("The channel to repond to the client returned an error.")] + CannotSendClientMessageError, + #[error("Subscription service had to drop {0} items")] + SubscriptionItemsDropedError(u64), + #[error("Subscription service closed.")] + SubscriptionServiceClosed, // Move module publishing related errors #[error("Failed to load the Move module, reason: {error:?}.")] @@ -215,6 +231,8 @@ pub enum SuiError { StorageError(#[from] typed_store::rocks::TypedStoreError), #[error("Batch error: cannot send transaction to batch.")] BatchErrorSender, + #[error("Authority Error: {error:?}")] + GenericAuthorityError { error: String }, #[error( "Failed to achieve quorum between authorities, cause by : {:#?}", diff --git a/sui_types/src/lib.rs b/sui_types/src/lib.rs index 84453a306350a..d2fa0b6b9cda6 100644 --- a/sui_types/src/lib.rs +++ b/sui_types/src/lib.rs @@ -14,6 +14,7 @@ use move_core_types::account_address::AccountAddress; pub mod error; pub mod base_types; +pub mod batch; pub mod coin; pub mod committee; pub mod crypto; diff --git a/sui_types/src/messages.rs b/sui_types/src/messages.rs index 6defb431a0e35..b6a3b5a0452c9 100644 --- a/sui_types/src/messages.rs +++ b/sui_types/src/messages.rs @@ -5,7 +5,7 @@ use crate::crypto::{sha3_hash, AuthoritySignature, BcsSignable, Signature}; use crate::object::{Object, ObjectFormatOptions, Owner, OBJECT_START_VERSION}; -use super::{base_types::*, committee::Committee, error::*, event::Event}; +use super::{base_types::*, batch::*, committee::Committee, error::*, event::Event}; #[cfg(test)] #[path = "unit_tests/messages_tests.rs"] @@ -188,6 +188,21 @@ pub struct AccountInfoRequest { pub account: SuiAddress, } +/// An information Request for batches, and their associated transactions +/// +/// This reads historic data and sends the batch and transactions in the +/// database starting at the batch that includes `start`, +/// and then listens to new transactions until a batch equal or +/// is over the batch end marker. +#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] +pub struct BatchInfoRequest { + pub start: TxSequenceNumber, + pub end: TxSequenceNumber, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct BatchInfoResponseItem(pub UpdateItem); + impl From for AccountInfoRequest { fn from(account: SuiAddress) -> Self { AccountInfoRequest { account } diff --git a/sui_types/src/serialize.rs b/sui_types/src/serialize.rs index 2467bfcb4619f..9bd804d81f598 100644 --- a/sui_types/src/serialize.rs +++ b/sui_types/src/serialize.rs @@ -24,6 +24,8 @@ pub enum SerializedMessage { ObjectInfoResp(Box), TransactionResp(Box), TransactionInfoReq(Box), + BatchInfoReq(Box), + BatchInfoResp(Box), } // This helper structure is only here to avoid cloning while serializing commands. @@ -42,6 +44,8 @@ enum ShallowSerializedMessage<'a> { ObjectInfoResp(&'a ObjectInfoResponse), TransactionResp(&'a TransactionInfoResponse), TransactionInfoReq(&'a TransactionInfoRequest), + BatchInfoReq(&'a BatchInfoRequest), + BatchInfoResp(&'a BatchInfoResponseItem), } fn serialize_into(writer: W, msg: &T) -> Result<(), anyhow::Error> @@ -119,6 +123,14 @@ pub fn serialize_vote(value: &SignedTransaction) -> Vec { serialize(&ShallowSerializedMessage::Vote(value)) } +pub fn serialize_batch_request(request: &BatchInfoRequest) -> Vec { + serialize(&ShallowSerializedMessage::BatchInfoReq(request)) +} + +pub fn serialize_batch_item(item: &BatchInfoResponseItem) -> Vec { + serialize(&ShallowSerializedMessage::BatchInfoResp(item)) +} + pub fn serialize_vote_into(writer: W, value: &SignedTransaction) -> Result<(), anyhow::Error> where W: std::io::Write,