From 86501f47d13ec848a05c75064fdd922b77523a40 Mon Sep 17 00:00:00 2001 From: Alberto Sonnino Date: Mon, 21 Mar 2022 14:12:15 +0000 Subject: [PATCH] [authority] Consensus client (#718) Consensus client & networked sequencer --- Cargo.toml | 1 + sui_core/Cargo.toml | 2 + sui_core/src/authority.rs | 37 +++- sui_core/src/authority/authority_store.rs | 35 +++- sui_core/src/consensus_client.rs | 204 +++++++++++++++++++ sui_core/src/lib.rs | 2 +- sui_core/src/unit_tests/authority_tests.rs | 7 +- sui_core/src/unit_tests/consensus_tests.rs | 141 +++++++++++++ sui_core/tests/staged/sui.yaml | 11 + sui_types/src/crypto.rs | 12 +- sui_types/src/error.rs | 3 + sui_types/src/messages.rs | 7 + sui_types/src/serialize.rs | 6 + test_utils/Cargo.toml | 12 +- test_utils/src/lib.rs | 2 +- test_utils/src/sequencer.rs | 226 ++++++++++++++++++--- 16 files changed, 646 insertions(+), 62 deletions(-) create mode 100644 sui_core/src/consensus_client.rs create mode 100644 sui_core/src/unit_tests/consensus_tests.rs diff --git a/Cargo.toml b/Cargo.toml index f7d1b4125a454..5952044d3b86f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "sui_programmability/adapter", "sui_programmability/framework", "sui_types", + "test_utils" ] [profile.release] diff --git a/sui_core/Cargo.toml b/sui_core/Cargo.toml index a40263360289f..6ef5d63c89e92 100644 --- a/sui_core/Cargo.toml +++ b/sui_core/Cargo.toml @@ -46,6 +46,8 @@ serde-reflection = "0.3.5" serde_yaml = "0.8.23" assert-str = "0.1.0" +test_utils = { path = "../test_utils" } + [[example]] name = "generate-format" path = "src/generate_format.rs" diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index 6ff458f087889..090e9ddd0b4ae 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -2,6 +2,10 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::{ + authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender}, + execution_engine, +}; use move_binary_format::CompiledModule; use move_bytecode_utils::module_cache::ModuleCache; use move_core_types::{ @@ -9,6 +13,7 @@ use move_core_types::{ resolver::{ModuleResolver, ResourceResolver}, }; use move_vm_runtime::native_functions::NativeFunctionTable; +use std::sync::atomic::AtomicUsize; use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, pin::Pin, @@ -29,11 +34,6 @@ use sui_types::{ }; use tracing::*; -use crate::{ - authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender}, - execution_engine, -}; - #[cfg(test)] #[path = "unit_tests/authority_tests.rs"] pub mod authority_tests; @@ -86,6 +86,9 @@ pub struct AuthorityState { /// and create batches for this authority. /// Keep as None if there is no need for this. batch_channels: Option<(BatchSender, BroadcastSender)>, + + /// Ensures there can only be a single consensus client is updating the state. + pub consensus_guardrail: AtomicUsize, } /// The authority state encapsulates all state, drives execution, and ensures safety. @@ -552,11 +555,14 @@ impl AuthorityState { pub async fn handle_consensus_certificate( &self, certificate: CertifiedTransaction, + last_consensus_index: SequenceNumber, ) -> SuiResult<()> { // Ensure it is a shared object certificate if !certificate.transaction.contains_shared_object() { - // TODO: Maybe add a warning here, no respectable authority should - // have sequenced this. + log::debug!( + "Transaction without shared object has been sequenced: {:?}", + certificate.transaction + ); return Ok(()); } @@ -574,11 +580,15 @@ impl AuthorityState { // Check the certificate. certificate.check(&self.committee)?; - // Persist the certificate. We are about to lock one or more shared object. + // Persist the certificate since we are about to lock one or more shared object. // We thus need to make sure someone (if not the client) can continue the protocol. - // Also atomically lock the shared objects for this particular transaction. + // Also atomically lock the shared objects for this particular transaction and + // increment the last consensus index. Note that a single process can ever call + // this function and that the last consensus index is also kept in memory. It is + // thus ok to only persist now (despite this function may have returned earlier). + // In the worst case, the synchronizer of the consensus client will catch up. self._database - .persist_certificate_and_lock_shared_objects(&transaction_digest, certificate) + .persist_certificate_and_lock_shared_objects(certificate, last_consensus_index) } pub async fn handle_transaction_info_request( @@ -762,7 +772,7 @@ impl AuthorityState { let native_functions = sui_framework::natives::all_natives(MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS); - AuthorityState { + Self { committee, name, secret, @@ -771,6 +781,7 @@ impl AuthorityState { .expect("We defined natives to not fail here"), _database: store, batch_channels: None, + consensus_guardrail: AtomicUsize::new(0), } } @@ -933,6 +944,10 @@ impl AuthorityState { ) -> Result, SuiError> { self._database.get_latest_parent_entry(object_id) } + + pub fn last_consensus_index(&self) -> SuiResult { + self._database.last_consensus_index() + } } impl ModuleResolver for AuthorityState { diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index 6feda91c6d0e3..362f50df3cdad 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -24,6 +24,10 @@ pub type ReplicaStore = SuiDataStore; const NUM_SHARDS: usize = 4096; +/// The key where the latest consensus index is stored in the database. +// TODO: Make a single table (e.g., called `variables`) storing all our lonely variables in one place. +const LAST_CONSENSUS_INDEX_ADDR: u64 = 0; + /// ALL_OBJ_VER determines whether we want to store all past /// versions of every object in the store. Authority doesn't store /// them, but other entities such as replicas will. @@ -94,6 +98,12 @@ pub struct SuiDataStore { /// A sequence of batches indexing into the sequence of executed transactions. pub batches: DBMap, + /// The following table is used to store a single value (the corresponding key is a constant). The value + /// represents the index of the latest consensus message this authority processed. This field is written + /// by a single process acting as consensus (light) client. It is used to ensure the authority processes + /// every message output by consensus (and in the right order). + last_consensus_index: DBMap, + /// The next available sequence number to use in the `executed sequence` table. pub next_sequence_number: AtomicU64, } @@ -174,6 +184,7 @@ impl SuiDataStore { ("schedule", &options), ("executed_sequence", &options), ("batches", &options), + ("last_consensus_index", &options), ], ) .expect("Cannot open DB."); @@ -205,6 +216,7 @@ impl SuiDataStore { sequenced, schedule, batches, + last_consensus_index, ) = reopen! ( &db, "objects";, @@ -217,7 +229,8 @@ impl SuiDataStore { "signed_effects";, "sequenced";<(TransactionDigest, ObjectID), SequenceNumber>, "schedule";, - "batches"; + "batches";, + "last_consensus_index"; ); AuthorityStore { objects, @@ -236,6 +249,7 @@ impl SuiDataStore { .collect(), executed_sequence, batches, + last_consensus_index, next_sequence_number, } } @@ -800,27 +814,32 @@ impl SuiDataStore { Ok(write_batch) } - /// Lock a sequence number for the shared objects of the input transaction. + /// Lock a sequence number for the shared objects of the input transaction. Also update the + /// last consensus index. pub fn persist_certificate_and_lock_shared_objects( &self, - transaction_digest: &TransactionDigest, certificate: CertifiedTransaction, + global_certificate_index: SequenceNumber, ) -> Result<(), SuiError> { + let transaction_digest = certificate.transaction.digest(); let certificate_to_write = std::iter::once((transaction_digest, &certificate)); let mut sequenced_to_write = Vec::new(); let mut schedule_to_write = Vec::new(); for id in certificate.transaction.shared_input_objects() { let version = self.schedule.get(id)?.unwrap_or_default(); - sequenced_to_write.push(((*transaction_digest, *id), version)); + sequenced_to_write.push(((transaction_digest, *id), version)); let next_version = version.increment(); schedule_to_write.push((*id, next_version)); } + let index_to_write = std::iter::once((LAST_CONSENSUS_INDEX_ADDR, global_certificate_index)); + let mut write_batch = self.sequenced.batch(); write_batch = write_batch.insert_batch(&self.certificates, certificate_to_write)?; write_batch = write_batch.insert_batch(&self.sequenced, sequenced_to_write)?; write_batch = write_batch.insert_batch(&self.schedule, schedule_to_write)?; + write_batch = write_batch.insert_batch(&self.last_consensus_index, index_to_write)?; write_batch.write().map_err(SuiError::from) } @@ -925,6 +944,14 @@ impl SuiDataStore { Ok((batches, transactions)) } + + /// Return the latest consensus index. It is used to bootstrap the consensus client. + pub fn last_consensus_index(&self) -> SuiResult { + self.last_consensus_index + .get(&LAST_CONSENSUS_INDEX_ADDR) + .map(|x| x.unwrap_or_default()) + .map_err(SuiError::from) + } } impl BackingPackageStore for SuiDataStore { diff --git a/sui_core/src/consensus_client.rs b/sui_core/src/consensus_client.rs new file mode 100644 index 0000000000000..cf8d5d63daf6d --- /dev/null +++ b/sui_core/src/consensus_client.rs @@ -0,0 +1,204 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::authority::AuthorityState; +use bytes::Bytes; +use std::cmp::Ordering; +use std::net::SocketAddr; +use std::sync::atomic::Ordering as AtomicOrdering; +use std::sync::Arc; +use sui_network::transport; +use sui_types::base_types::SequenceNumber; +use sui_types::error::{SuiError, SuiResult}; +use sui_types::messages::{ConfirmationTransaction, ConsensusOutput}; +use sui_types::serialize::{deserialize_message, SerializedMessage}; +use sui_types::{fp_bail, fp_ensure}; +use tokio::task::JoinHandle; + +#[cfg(test)] +#[path = "unit_tests/consensus_tests.rs"] +pub mod consensus_tests; + +/// The `ConsensusClient` receives certificates sequenced by the consensus and updates +/// the authority's database. The client assumes that the messages it receives have +/// already been authenticated (ie. they really come from a trusted consensus node) and +/// integrity-validated (ie. no corrupted messages). +pub struct ConsensusClient { + /// The (global) authority state to update the locks of shared objects. + state: Arc, + /// The index of the latest consensus message we processed. + last_consensus_index: SequenceNumber, +} + +impl Drop for ConsensusClient { + fn drop(&mut self) { + self.state + .consensus_guardrail + .fetch_sub(1, AtomicOrdering::SeqCst); + } +} + +impl ConsensusClient { + /// Create a new consensus handler with the input authority state. + pub fn new(state: Arc) -> SuiResult { + // Ensure there is a single consensus client modifying the state. + let status = state + .consensus_guardrail + .fetch_add(1, AtomicOrdering::SeqCst); + fp_ensure!(status == 0, SuiError::OnlyOneConsensusClientPermitted); + + // Load the last consensus index from storage. + let last_consensus_index = state.last_consensus_index()?; + + // Return a consensus client only if all went well (safety-critical). + Ok(Self { + state, + last_consensus_index, + }) + } + + /// Spawn the consensus client in a new tokio task. + pub fn spawn( + mut handler: Self, + address: SocketAddr, + buffer_size: usize, + ) -> JoinHandle> { + log::info!("Consensus client connecting to {}", address); + tokio::spawn(async move { + handler.synchronize().await?; + handler.run(address, buffer_size).await?; + Ok(()) + }) + } + + /// Synchronize with the consensus in case we missed part of its output sequence. + /// It is safety-critical that we process the consensus outputs in the right order. + async fn synchronize(&mut self) -> SuiResult<()> { + // TODO [issue #932]: [liveness-critical] Implement the synchronizer. + Ok(()) + } + + /// Process a single sequenced certificate. + async fn handle_consensus_message(&mut self, bytes: Bytes) -> SuiResult<()> { + // We first deserialize the consensus output message. If deserialization fails + // we may be have a liveness issue. We stop processing of this certificate to + // ensure safety, and the synchronizer will try again to ask for that certificate. + let (consensus_message, consensus_index) = match deserialize_message(&*bytes) { + Ok(SerializedMessage::ConsensusOutput(value)) => { + let ConsensusOutput { + message, + sequencer_number, + } = *value; + (message, sequencer_number) + } + Ok(_) => { + log::error!("{}", SuiError::UnexpectedMessage); + return Err(SuiError::UnexpectedMessage); + } + Err(e) => { + log::error!("Failed to deserialize consensus output {}", e); + return Err(SuiError::InvalidDecoding); + } + }; + + // Check that the latest consensus index is as expected; otherwise synchronize. + match self.last_consensus_index.cmp(&consensus_index) { + Ordering::Greater => { + // Something is very wrong. Liveness may be lost (but not safety). + log::error!("Consensus index of authority bigger than expected"); + return Ok(()); + } + Ordering::Less => { + log::debug!("Authority is synchronizing missed sequenced certificates"); + self.synchronize().await?; + return Ok(()); + } + Ordering::Equal => (), + } + + // Update the latest consensus index. The authority state will atomically + // update it in the storage when processing the certificate. It is important to + // increment the consensus index before deserializing the certificate because + // the consensus core will increment its own index regardless of deserialization + // or other protocol-specific failures. + self.last_consensus_index = self.last_consensus_index.increment(); + + // The consensus simply orders bytes, so we first need to deserialize the + // certificate. If the deserialization fail it is safe to ignore the + // certificate since all correct authorities will do the same. Remember that a + // bad authority or client may input random bytes to the consensus. + let confirmation = match deserialize_message(&*consensus_message) { + Ok(SerializedMessage::Cert(certificate)) => ConfirmationTransaction { + certificate: *certificate, + }, + Ok(_) => { + log::debug!("{}", SuiError::UnexpectedMessage); + return Err(SuiError::UnexpectedMessage); + } + Err(e) => { + log::debug!("Failed to deserialize certificate {}", e); + return Err(SuiError::InvalidDecoding); + } + }; + + // Process the certificate to set the locks on the shared objects. It also + // atomically update the last consensus index in storage. It is safety-critical + // that only this task calls the function below. Safety is preserved even if an + // authority crashes before this point but after having processed a number of + // badly serialized certificates, but the synchronizer will have to do more work. + let certificate = confirmation.certificate; + self.state + .handle_consensus_certificate(certificate, self.last_consensus_index) + .await + } + + /// Main loop connecting to the consensus. This mainly acts as a light client. + async fn run(&mut self, address: SocketAddr, buffer_size: usize) -> SuiResult<()> { + // TODO [issue #931]: Do not try to reconnect immediately after the connection fails, use some + // sort of back off. We may also move this logic to `sui-network::transport` to + // expose a 'stream client' or something like that. + 'main: loop { + // Subscribe to the consensus' output. + let mut connection = match transport::connect(address.to_string(), buffer_size).await { + Ok(connection) => connection, + Err(e) => { + log::warn!("Failed to subscribe to consensus output: {}", e); + continue 'main; + } + }; + + // Listen to sequenced certificates and process them. + loop { + let bytes = match connection.read_data().await { + Some(Ok(data)) => Bytes::from(data), + Some(Err(e)) => { + log::warn!("Failed to receive data from consensus: {}", e); + continue 'main; + } + None => { + log::debug!("Connection dropped by consensus"); + continue 'main; + } + }; + + match self.handle_consensus_message(bytes).await { + // Log the errors that are our faults (not the client's). + Err(SuiError::StorageError(e)) => { + log::error!("{}", e); + + // If we have a store error we cannot continue processing other + // outputs from consensus. We may otherwise attribute locks to + // shared objects that are different from other authorities. It + // is however safe to ask for that certificate again and re-process + // it (the core is idempotent). + fp_bail!(SuiError::StorageError(e)); + } + // Log the errors that are the client's fault (not ours). This is + // only for debug purposes: all correct authorities will do the same. + Err(e) => log::debug!("{}", e), + Ok(()) => (), + } + } + } + } +} diff --git a/sui_core/src/lib.rs b/sui_core/src/lib.rs index d4b5fe7fc530e..a6f2fc9727ac1 100644 --- a/sui_core/src/lib.rs +++ b/sui_core/src/lib.rs @@ -7,7 +7,7 @@ pub mod authority_aggregator; pub mod authority_batch; pub mod authority_client; pub mod authority_server; -pub mod consensus_handler; +pub mod consensus_client; pub mod execution_engine; pub mod gateway_state; pub mod safe_client; diff --git a/sui_core/src/unit_tests/authority_tests.rs b/sui_core/src/unit_tests/authority_tests.rs index 8aa93b4ebf3de..4db0fb87c514f 100644 --- a/sui_core/src/unit_tests/authority_tests.rs +++ b/sui_core/src/unit_tests/authority_tests.rs @@ -1376,7 +1376,7 @@ fn init_state_parameters() -> (Committee, SuiAddress, KeyPair, Arc AuthorityState { +pub async fn init_state() -> AuthorityState { let (committee, _, authority_key, store) = init_state_parameters(); AuthorityState::new( committee, @@ -1643,7 +1643,10 @@ async fn shared_object() { // Sequence the certificate to assign a sequence number to the shared object. authority - .handle_consensus_certificate(certificate) + .handle_consensus_certificate( + certificate, + /* last_consensus_index */ SequenceNumber::new(), + ) .await .unwrap(); diff --git a/sui_core/src/unit_tests/consensus_tests.rs b/sui_core/src/unit_tests/consensus_tests.rs new file mode 100644 index 0000000000000..496f9ab6e183d --- /dev/null +++ b/sui_core/src/unit_tests/consensus_tests.rs @@ -0,0 +1,141 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use crate::authority::authority_tests::get_genesis_package_by_module; +use crate::authority::authority_tests::{init_state, init_state_with_objects}; +use move_core_types::account_address::AccountAddress; +use move_core_types::ident_str; +use rand::rngs::StdRng; +use rand::SeedableRng; +use std::time::Duration; +use sui_adapter::genesis; +use sui_network::transport; +use sui_types::base_types::{ObjectID, TransactionDigest}; +use sui_types::crypto::{get_key_pair_from_rng, Signature}; +use sui_types::messages::{SignatureAggregator, Transaction, TransactionData}; +use sui_types::object::{Data, Object, Owner}; +use sui_types::serialize::serialize_cert; +use test_utils::sequencer::Sequencer; + +/// Default network buffer size. +const NETWORK_BUFFER_SIZE: usize = 65_000; + +#[tokio::test] +async fn handle_consensus_output() { + let mut rng = StdRng::from_seed([0; 32]); + let (sender, keypair) = get_key_pair_from_rng(&mut rng); + + // Initialize an authority with a (owned) gas object and a shared object. + let gas_object_id = ObjectID::random(); + let gas_object = Object::with_id_owner_for_testing(gas_object_id, sender); + let gas_object_ref = gas_object.compute_object_reference(); + + let shared_object_id = ObjectID::random(); + let shared_object = { + use sui_types::gas_coin::GasCoin; + use sui_types::object::MoveObject; + + let content = GasCoin::new(shared_object_id, SequenceNumber::new(), 10); + let data = Data::Move(MoveObject::new( + /* type */ GasCoin::type_(), + content.to_bcs_bytes(), + )); + Object { + data, + owner: Owner::SharedMutable, + previous_transaction: TransactionDigest::genesis(), + } + }; + + let authority = init_state_with_objects(vec![gas_object, shared_object]).await; + + // Make a sample transaction. + let module = "ObjectBasics"; + let function = "create"; + let genesis_package_objects = genesis::clone_genesis_packages(); + let package_object_ref = get_genesis_package_by_module(&genesis_package_objects, module); + + let data = TransactionData::new_move_call( + sender, + package_object_ref, + ident_str!(module).to_owned(), + ident_str!(function).to_owned(), + /* type_args */ vec![], + gas_object_ref, + /* object_args */ vec![], + vec![shared_object_id], + /* pure_args */ + vec![ + 16u64.to_le_bytes().to_vec(), + bcs::to_bytes(&AccountAddress::from(sender)).unwrap(), + ], + /* max_gas */ 10_000, + ); + let signature = Signature::new(&data, &keypair); + let transaction = Transaction::new(data, signature); + + // Submit the transaction and assemble a certificate. + let response = authority + .handle_transaction(transaction.clone()) + .await + .unwrap(); + let vote = response.signed_transaction.unwrap(); + let certificate = SignatureAggregator::try_new(transaction, &authority.committee) + .unwrap() + .append(vote.authority, vote.signature) + .unwrap() + .unwrap(); + let serialized_certificate = serialize_cert(&certificate); + + // Spawn a sequencer. + // TODO [issue #932]: Use a port allocator to avoid port conflicts. + let consensus_input_address = "127.0.0.1:1309".parse().unwrap(); + let consensus_subscriber_address = "127.0.0.1:1310".parse().unwrap(); + let sequencer = Sequencer { + input_address: consensus_input_address, + subscriber_address: consensus_subscriber_address, + buffer_size: NETWORK_BUFFER_SIZE, + consensus_delay: Duration::from_millis(0), + }; + Sequencer::spawn(sequencer).await; + + // Spawn a consensus client. + let state = Arc::new(authority); + let consensus_client = ConsensusClient::new(state.clone()).unwrap(); + ConsensusClient::spawn( + consensus_client, + consensus_subscriber_address, + NETWORK_BUFFER_SIZE, + ); + + // Submit a certificate to the sequencer. + tokio::task::yield_now().await; + transport::connect(consensus_input_address.to_string(), NETWORK_BUFFER_SIZE) + .await + .unwrap() + .write_data(&serialized_certificate) + .await + .unwrap(); + + // Wait for the certificate to be processed and ensure the last consensus index + // has been updated. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert_eq!( + state.db().last_consensus_index().unwrap(), + SequenceNumber::from(1) + ); +} + +#[tokio::test] +async fn test_guardrail() { + let authority = init_state().await; + let state = Arc::new(authority); + + // Create a first consensus client. + let _consensus_client = ConsensusClient::new(state.clone()).unwrap(); + + // Create a second consensus client from the same state. + let result = ConsensusClient::new(state); + assert!(result.is_err()); +} diff --git a/sui_core/tests/staged/sui.yaml b/sui_core/tests/staged/sui.yaml index d18a9f8563bbe..5af5b7d36364a 100644 --- a/sui_core/tests/staged/sui.yaml +++ b/sui_core/tests/staged/sui.yaml @@ -117,6 +117,11 @@ CertifiedTransaction: TUPLE: - TYPENAME: PublicKeyBytes - TYPENAME: AuthoritySignature +ConsensusOutput: + STRUCT: + - message: BYTES + - sequencer_number: + TYPENAME: SequenceNumber Data: ENUM: 0: @@ -368,6 +373,10 @@ SerializedMessage: BatchInfoResp: NEWTYPE: TYPENAME: BatchInfoResponseItem + 12: + ConsensusOutput: + NEWTYPE: + TYPENAME: ConsensusOutput Signature: NEWTYPESTRUCT: BYTES SignedBatch: @@ -728,6 +737,8 @@ SuiError: InconsistentGatewayResult: STRUCT: - error: STR + 89: + OnlyOneConsensusClientPermitted: UNIT Transaction: STRUCT: - data: diff --git a/sui_types/src/crypto.rs b/sui_types/src/crypto.rs index f717001c63d4b..6d8169d3b5779 100644 --- a/sui_types/src/crypto.rs +++ b/sui_types/src/crypto.rs @@ -135,13 +135,19 @@ impl std::fmt::Debug for PublicKeyBytes { // TODO: get_key_pair() and get_key_pair_from_bytes() should return KeyPair only. // TODO: rename to random_key_pair pub fn get_key_pair() -> (SuiAddress, KeyPair) { - let mut csprng = OsRng; - let kp = dalek::Keypair::generate(&mut csprng); + get_key_pair_from_rng(&mut OsRng) +} + +/// Generate a keypair from the specified RNG (useful for testing with seedable rngs). +pub fn get_key_pair_from_rng(csprng: &mut R) -> (SuiAddress, KeyPair) +where + R: rand::CryptoRng + rand::RngCore, +{ + let kp = dalek::Keypair::generate(csprng); let keypair = KeyPair { key_pair: kp, public_key_cell: OnceCell::new(), }; - (SuiAddress::from(keypair.public_key_bytes()), keypair) } diff --git a/sui_types/src/error.rs b/sui_types/src/error.rs index 49ce3c0bc41fb..f1a30881d0835 100644 --- a/sui_types/src/error.rs +++ b/sui_types/src/error.rs @@ -265,6 +265,9 @@ pub enum SuiError { }, #[error("Inconsistent results observed in the Gateway. This should not happen and typically means there is a bug in the Sui implementation. Details: {error:?}")] InconsistentGatewayResult { error: String }, + + #[error("Authority state can be modified by a single consensus client at the time")] + OnlyOneConsensusClientPermitted, } pub type SuiResult = Result; diff --git a/sui_types/src/messages.rs b/sui_types/src/messages.rs index 15e731f57b40c..1edb51e07ce56 100644 --- a/sui_types/src/messages.rs +++ b/sui_types/src/messages.rs @@ -1083,3 +1083,10 @@ impl ConfirmationTransaction { } impl BcsSignable for TransactionData {} + +#[derive(Serialize, Deserialize)] +pub struct ConsensusOutput { + #[serde(with = "serde_bytes")] + pub message: Vec, + pub sequencer_number: SequenceNumber, +} diff --git a/sui_types/src/serialize.rs b/sui_types/src/serialize.rs index 9bd804d81f598..87e976f4a0310 100644 --- a/sui_types/src/serialize.rs +++ b/sui_types/src/serialize.rs @@ -26,6 +26,7 @@ pub enum SerializedMessage { TransactionInfoReq(Box), BatchInfoReq(Box), BatchInfoResp(Box), + ConsensusOutput(Box), } // This helper structure is only here to avoid cloning while serializing commands. @@ -46,6 +47,7 @@ enum ShallowSerializedMessage<'a> { TransactionInfoReq(&'a TransactionInfoRequest), BatchInfoReq(&'a BatchInfoRequest), BatchInfoResp(&'a BatchInfoResponseItem), + ConsensusOutput(&'a ConsensusOutput), } fn serialize_into(writer: W, msg: &T) -> Result<(), anyhow::Error> @@ -152,6 +154,10 @@ where serialize_into(writer, &ShallowSerializedMessage::TransactionResp(value)) } +pub fn serialize_consensus_output(value: &ConsensusOutput) -> Vec { + serialize(&ShallowSerializedMessage::ConsensusOutput(value)) +} + pub fn deserialize_message(reader: R) -> Result where R: std::io::Read, diff --git a/test_utils/Cargo.toml b/test_utils/Cargo.toml index 6e158f096643c..b20e510616859 100644 --- a/test_utils/Cargo.toml +++ b/test_utils/Cargo.toml @@ -8,14 +8,12 @@ edition = "2021" [dependencies] tokio = { version = "1.16.1", features = ["sync", "rt"] } -rand = "0.7.3" -rocksdb = "0.17.0" -fdlimit = "0.2.1" -portpicker = "0.1.1" bytes = "1.1.0" futures = "0.3.21" +async-trait = "0.1.52" +log = "0.4.14" +rand = "0.7.3" +rocksdb = "0.18.0" sui-types = { path = "../sui_types" } -sui_core = { path = "../sui_core" } -sui-adapter = { path = "../sui_programmability/adapter" } -sui = { path = "../sui" } \ No newline at end of file +sui-network = { path = "../network_utils" } \ No newline at end of file diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 8460c84a7c952..8dce77562d0a9 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -1,4 +1,4 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -pub mod sequencer; \ No newline at end of file +pub mod sequencer; diff --git a/test_utils/src/sequencer.rs b/test_utils/src/sequencer.rs index 1e48705b66834..77e0bce374dbb 100644 --- a/test_utils/src/sequencer.rs +++ b/test_utils/src/sequencer.rs @@ -1,67 +1,227 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use async_trait::async_trait; +use bytes::Bytes; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; +use futures::SinkExt; +use std::net::SocketAddr; use std::time::Duration; -use tokio::sync::{broadcast, mpsc}; -use tokio::task::JoinHandle; +use sui_network::transport::{MessageHandler, RwChannel}; +use sui_types::base_types::SequenceNumber; +use sui_types::messages::ConsensusOutput; +use sui_types::serialize::serialize_consensus_output; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; -/// A mock single-process sequencer. This will be replaced by a proper consensus protocol. -pub struct MockSequencer { - /// The delay to wait before sequencing a message. This parameter emulates latency. - delay: Duration, - /// Receive input messages to sequence. - rx_input: mpsc::Receiver, - /// Deliver a sequence of messages. - tx_output: broadcast::Sender, +/// A mock single-process sequencer. It is not crash-safe (it has no storage) and should +/// only be used for testing. +pub struct Sequencer { + /// The network address where to receive input messages. + pub input_address: SocketAddr, + /// The network address where to receive subscriber requests. + pub subscriber_address: SocketAddr, + /// The network buffer size. + pub buffer_size: usize, + /// The delay to wait before sequencing a message. This parameter is only required to + /// emulates the consensus' latency. + pub consensus_delay: Duration, } -impl MockSequencer +impl Sequencer { + /// Spawn a new sequencer. The sequencer is made of a number of component each running + /// in their own tokio task. + pub async fn spawn(sequencer: Self) { + let (tx_input, rx_input) = channel(100); + let (tx_subscriber, rx_subscriber) = channel(100); + + // Spawn the sequencer core. + tokio::spawn(async move { + SequencerCore::new(rx_input, rx_subscriber) + .run(sequencer.consensus_delay) + .await; + }); + + // Spawn the server receiving input messages to order. + tokio::spawn(async move { + let input_server = InputServer { tx_input }; + sui_network::transport::spawn_server( + &sequencer.input_address.to_string(), + input_server, + sequencer.buffer_size, + ) + .await + .unwrap() + .join() + .await + .unwrap(); + }); + + // Spawn the server receiving subscribers to the output of the sequencer. + tokio::spawn(async move { + let subscriber_server = SubscriberServer { tx_subscriber }; + sui_network::transport::spawn_server( + &sequencer.subscriber_address.to_string(), + subscriber_server, + sequencer.buffer_size, + ) + .await + .unwrap() + .join() + .await + .unwrap(); + }); + } +} + +/// The core of the sequencer, totally ordering input bytes. +pub struct SequencerCore { + /// Receive users' certificates to sequence + rx_input: Receiver, + /// Receive subscribers to update with the output of the sequence. + rx_subscriber: Receiver>, + /// The global consensus index. + consensus_index: SequenceNumber, +} + +impl SequencerCore where - Message: std::fmt::Debug + Send + Sync + 'static, + Message: Clone + Send + Sync + 'static, { - /// Spawn a new `Sequencer` in a separate tokio task. - pub fn spawn( - delay: Duration, - rx_input: mpsc::Receiver, - tx_output: broadcast::Sender, - ) -> JoinHandle<()> { - tokio::spawn(async move { - Self { - delay, - rx_input, - tx_output, - } - .run() - .await; - }) + /// Create a new sequencer core instance. + pub fn new( + rx_input: Receiver, + rx_subscriber: Receiver>, + ) -> Self { + Self { + rx_input, + rx_subscriber, + consensus_index: SequenceNumber::new(), + } } - /// Helper function. It simply waits for a fixed delay and then returns the input. + /// Simply wait for a fixed delay and then returns the input. async fn waiter(deliver: Message, delay: Duration) -> Message { sleep(delay).await; deliver } /// Main loop ordering input bytes. - async fn run(&mut self) { + pub async fn run(&mut self, consensus_delay: Duration) { let mut waiting = FuturesUnordered::new(); + let mut subscribers = Vec::new(); loop { tokio::select! { // Receive bytes to order. Some(message) = self.rx_input.recv() => { - waiting.push(Self::waiter(message, self.delay)); + waiting.push(Self::waiter(message, consensus_delay)); + }, + + // Receive subscribers to update with the sequencer's output. + Some(sender) = self.rx_subscriber.recv() => { + subscribers.push(sender); }, - // Bytes are ready to be delivered. + // Bytes are ready to be delivered, notify the subscribers. Some(message) = waiting.next() => { - if let Err(e) = self.tx_output.send(message) { - log::warn!("Failed to output sequence: {}", e); + // Notify the subscribers of the new output. + let mut to_drop = Vec::new(); + for (i, subscriber) in subscribers.iter().enumerate() { + if subscriber.send((message.clone(), self.consensus_index)).await.is_err() { + to_drop.push(i); + } + } + + // Increment the consensus index. + self.consensus_index = self.consensus_index.increment(); + + // Cleanup the list subscribers that dropped connection. + for index in to_drop { + subscribers.remove(index); } } } } } } + +/// Define how the network server should handle incoming clients' certificates. This +/// is not got to stream many input transactions (benchmarks) as the task handling the +/// TCP connection blocks until a reply is ready. +struct InputServer { + /// Send user transactions to the sequencer. + pub tx_input: Sender, +} + +#[async_trait] +impl<'a, Stream> MessageHandler for InputServer +where + Stream: 'static + RwChannel<'a> + Unpin + Send, +{ + async fn handle_messages(&self, mut stream: Stream) { + loop { + // Read the user's certificate. + let buffer = match stream.stream().next().await { + Some(Ok(buffer)) => buffer, + Some(Err(e)) => { + log::warn!("Error while reading TCP stream: {}", e); + break; + } + None => { + log::debug!("Connection dropped by the client"); + break; + } + }; + + // Send the certificate to the sequencer. + if self.tx_input.send(buffer.freeze()).await.is_err() { + panic!("Failed to sequence input bytes"); + } + + // Send an acknowledgment to the client. + if stream.sink().send(Bytes::from("Ok")).await.is_err() { + log::debug!("Failed to send ack to client"); + break; + } + } + } +} + +/// Define how the network server should handle incoming authorities sync requests. +/// The authorities are basically light clients of the sequencer. A real consensus +/// implementation would make sure to receive an ack from the authorities and retry +/// sending until the message is delivered. This is safety-critical. +struct SubscriberServer { + /// Notify the sequencer's core of a new subscriber. + pub tx_subscriber: Sender>, +} + +#[async_trait] +impl<'a, Stream> MessageHandler for SubscriberServer +where + Stream: 'static + RwChannel<'a> + Unpin + Send, +{ + async fn handle_messages(&self, mut stream: Stream) { + let (tx_output, mut rx_output) = channel(100); + + // Notify the core of a new subscriber. + self.tx_subscriber + .send(tx_output) + .await + .expect("Failed to send new subscriber to core"); + + // Update the subscriber every time a certificate is sequenced. + while let Some((bytes, consensus_index)) = rx_output.recv().await { + let message = ConsensusOutput { + message: bytes.to_vec(), + sequencer_number: consensus_index, + }; + let serialized = serialize_consensus_output(&message); + if stream.sink().send(Bytes::from(serialized)).await.is_err() { + log::debug!("Connection dropped by subscriber"); + break; + } + } + } +}