From e83495a5f9a816517990f2e01c6436ce965498da Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 13 Jun 2022 17:35:30 -0700 Subject: [PATCH] Add QuorumDriver (#2515) * Add QuorumDriver * rebase * Remove timeout completely * Fix subscription * Address feedback * Reorganize data structures * Remove mutex * More feedback * add member --- Cargo.lock | 14 ++ Cargo.toml | 1 + crates/sui-core/src/authority_aggregator.rs | 2 +- crates/sui-core/tests/staged/sui.yaml | 4 + crates/sui-node/Cargo.toml | 1 + crates/sui-quorum-driver/Cargo.toml | 17 ++ crates/sui-quorum-driver/src/lib.rs | 224 ++++++++++++++++++++ crates/sui-types/src/error.rs | 3 + crates/sui-types/src/messages.rs | 22 ++ crates/sui/Cargo.toml | 1 + crates/sui/tests/quorum_driver_tests.rs | 144 +++++++++++++ crates/test-utils/src/messages.rs | 13 ++ 12 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 crates/sui-quorum-driver/Cargo.toml create mode 100644 crates/sui-quorum-driver/src/lib.rs create mode 100644 crates/sui/tests/quorum_driver_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 907c7322c0133..c7c6483128f99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5845,6 +5845,7 @@ dependencies = [ "sui-node", "sui-open-rpc", "sui-open-rpc-macros", + "sui-quorum-driver", "sui-storage", "sui-swarm", "sui-types", @@ -6154,6 +6155,7 @@ dependencies = [ "sui-core", "sui-gateway", "sui-network", + "sui-quorum-driver", "sui-storage", "telemetry-subscribers", "tokio", @@ -6182,6 +6184,18 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "sui-quorum-driver" +version = "0.1.0" +dependencies = [ + "arc-swap", + "sui-core", + "sui-types", + "tokio", + "tracing", + "workspace-hack", +] + [[package]] name = "sui-storage" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index bdaddde4919f9..0262b54aab7e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "crates/sui-node", "crates/sui-open-rpc", "crates/sui-open-rpc-macros", + "crates/sui-quorum-driver", "crates/sui-storage", "crates/sui-swarm", "crates/sui-transactional-test-runner", diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index f4fc8ccc87e68..5e83c39d1f669 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -1066,7 +1066,7 @@ where /// At that point (and after) enough authorities are up to date with all objects /// needed to process the certificate that a submission should succeed. However, /// in case an authority returns an error, we do try to bring it up to speed. - async fn process_certificate( + pub async fn process_certificate( &self, certificate: CertifiedTransaction, ) -> Result { diff --git a/crates/sui-core/tests/staged/sui.yaml b/crates/sui-core/tests/staged/sui.yaml index 063813ba8d74d..87cbae5008329 100644 --- a/crates/sui-core/tests/staged/sui.yaml +++ b/crates/sui-core/tests/staged/sui.yaml @@ -665,6 +665,10 @@ SuiError: UnsupportedFeatureError: STRUCT: - error: STR + 115: + QuorumDriverCommunicationError: + STRUCT: + - error: STR TransactionDigest: NEWTYPESTRUCT: BYTES TransactionEffectsDigest: diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index 36d1f29e56f7e..ed8f674a45d85 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -22,6 +22,7 @@ sui-core = { path = "../sui-core" } sui-storage = { path = "../sui-storage" } sui-gateway = { path = "../sui-gateway" } sui-network = { path = "../sui-network" } +sui-quorum-driver = { path = "../sui-quorum-driver" } telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" } mysten-network = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" } diff --git a/crates/sui-quorum-driver/Cargo.toml b/crates/sui-quorum-driver/Cargo.toml new file mode 100644 index 0000000000000..af7f8899e5a57 --- /dev/null +++ b/crates/sui-quorum-driver/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sui-quorum-driver" +version = "0.1.0" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +arc-swap = "1.5.0" +tokio = { version = "1.18.2", features = ["full"] } +tracing = "0.1.34" + +sui-core = { path = "../sui-core" } +sui-types = { path = "../sui-types" } + +workspace-hack = { path = "../workspace-hack"} diff --git a/crates/sui-quorum-driver/src/lib.rs b/crates/sui-quorum-driver/src/lib.rs new file mode 100644 index 0000000000000..d1e7dfa22a2cb --- /dev/null +++ b/crates/sui-quorum-driver/src/lib.rs @@ -0,0 +1,224 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use arc_swap::ArcSwap; +use std::sync::Arc; + +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::task::JoinHandle; +use tracing::log::{error, warn}; +use tracing::Instrument; + +use sui_core::authority_aggregator::AuthorityAggregator; +use sui_core::authority_client::AuthorityAPI; +use sui_types::error::{SuiError, SuiResult}; +use sui_types::messages::{ + CertifiedTransaction, ExecuteTransactionRequest, ExecuteTransactionRequestType, + ExecuteTransactionResponse, Transaction, TransactionEffects, +}; + +pub enum QuorumTask { + ProcessTransaction(Transaction), + ProcessCertificate(CertifiedTransaction), + UpdateCommittee(AuthorityAggregator), +} + +/// A handler to wrap around QuorumDriver. This handler should be owned by the node with exclusive +/// mutability. +pub struct QuorumDriverHandler { + quorum_driver: Arc>, + _processor_handle: JoinHandle<()>, + // TODO: Change to CertifiedTransactionEffects eventually. + effects_subscriber: Receiver<(CertifiedTransaction, TransactionEffects)>, +} + +/// The core data structure of the QuorumDriver. +/// It's expected that the QuorumDriver will be wrapped in an `Arc` and shared around. +/// One copy will be used in a json-RPC server to serve transaction execution requests; +/// Another copy will be held by a QuorumDriverHandler to either send signal to update the +/// committee, or to subscribe effects generated from the QuorumDriver. +pub struct QuorumDriver { + validators: ArcSwap>, + task_sender: Sender>, + effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>, +} + +impl QuorumDriver { + pub fn new( + validators: AuthorityAggregator, + task_sender: Sender>, + effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>, + ) -> Self { + Self { + validators: ArcSwap::from(Arc::new(validators)), + task_sender, + effects_subscribe_sender, + } + } +} + +impl QuorumDriver +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + pub async fn execute_transaction( + &self, + request: ExecuteTransactionRequest, + ) -> SuiResult { + let ExecuteTransactionRequest { + transaction, + request_type, + } = request; + match request_type { + ExecuteTransactionRequestType::ImmediateReturn => { + self.task_sender + .send(QuorumTask::ProcessTransaction(transaction)) + .await + .map_err(|err| SuiError::QuorumDriverCommunicationError { + error: err.to_string(), + })?; + Ok(ExecuteTransactionResponse::ImmediateReturn) + } + ExecuteTransactionRequestType::WaitForTxCert => { + let certificate = self + .process_transaction(transaction) + .instrument(tracing::debug_span!("process_tx")) + .await?; + self.task_sender + .send(QuorumTask::ProcessCertificate(certificate.clone())) + .await + .map_err(|err| SuiError::QuorumDriverCommunicationError { + error: err.to_string(), + })?; + Ok(ExecuteTransactionResponse::TxCert(Box::new(certificate))) + } + ExecuteTransactionRequestType::WaitForEffectsCert => { + let certificate = self + .process_transaction(transaction) + .instrument(tracing::debug_span!("process_tx")) + .await?; + let response = self + .process_certificate(certificate) + .instrument(tracing::debug_span!("process_cert")) + .await?; + Ok(ExecuteTransactionResponse::EffectsCert(Box::new(response))) + } + } + } + + pub async fn process_transaction( + &self, + transaction: Transaction, + ) -> SuiResult { + self.validators + .load() + .process_transaction(transaction) + .instrument(tracing::debug_span!("process_tx")) + .await + } + + pub async fn process_certificate( + &self, + certificate: CertifiedTransaction, + ) -> SuiResult<(CertifiedTransaction, TransactionEffects)> { + let effects = self + .validators + .load() + .process_certificate(certificate.clone()) + .instrument(tracing::debug_span!("process_cert")) + .await?; + let response = (certificate, effects); + // An error to send the result to subscribers should not block returning the result. + if let Err(err) = self.effects_subscribe_sender.send(response.clone()).await { + // TODO: We could potentially retry sending if we want. + error!("{}", err); + } + Ok(response) + } +} + +impl QuorumDriverHandler +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + pub fn new(validators: AuthorityAggregator) -> Self { + let (task_tx, task_rx) = mpsc::channel::>(5000); + let (subscriber_tx, subscriber_rx) = mpsc::channel::<_>(5000); + let quorum_driver = Arc::new(QuorumDriver::new(validators, task_tx, subscriber_tx)); + let handle = { + let quorum_driver_copy = quorum_driver.clone(); + tokio::task::spawn(async move { + Self::task_queue_processor(quorum_driver_copy, task_rx).await; + }) + }; + Self { + quorum_driver, + _processor_handle: handle, + effects_subscriber: subscriber_rx, + } + } + + pub fn clone_quorum_driver(&self) -> Arc> { + self.quorum_driver.clone() + } + + pub fn subscribe(&mut self) -> &mut Receiver<(CertifiedTransaction, TransactionEffects)> { + &mut self.effects_subscriber + } + + pub async fn update_validators(&self, new_validators: AuthorityAggregator) -> SuiResult { + self.quorum_driver + .task_sender + .send(QuorumTask::UpdateCommittee(new_validators)) + .await + .map_err(|err| SuiError::QuorumDriverCommunicationError { + error: err.to_string(), + }) + } + + async fn task_queue_processor( + quorum_driver: Arc>, + mut task_receiver: Receiver>, + ) { + loop { + if let Some(task) = task_receiver.recv().await { + match task { + QuorumTask::ProcessTransaction(transaction) => { + // TODO: We entered here because callers do not want to wait for a + // transaction to finish execution. When this failed, we do not have a + // way to notify the caller. In the future, we may want to maintain + // some data structure for callers to come back and query the status + // of a transaction latter. + match quorum_driver.process_transaction(transaction).await { + Ok(cert) => { + if let Err(err) = quorum_driver + .task_sender + .send(QuorumTask::ProcessCertificate(cert)) + .await + { + error!( + "Sending task to quorum driver queue failed: {}", + err.to_string() + ); + } + } + Err(err) => { + warn!("Transaction processing failed: {:?}", err); + } + } + } + QuorumTask::ProcessCertificate(certificate) => { + // TODO: Similar to ProcessTransaction, we may want to allow callers to + // query the status. + if let Err(err) = quorum_driver.process_certificate(certificate).await { + warn!("Certificate processing failed: {:?}", err); + } + } + QuorumTask::UpdateCommittee(new_validators) => { + quorum_driver.validators.store(Arc::new(new_validators)); + } + } + } + } + } +} diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index 6e0c0e2baaabb..671e3efcddca8 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -358,6 +358,9 @@ pub enum SuiError { #[error("Use of disabled feature: {:?}", error)] UnsupportedFeatureError { error: String }, + + #[error("Unable to communicate with the Quorum Driver channel: {:?}", error)] + QuorumDriverCommunicationError { error: String }, } pub type SuiResult = Result; diff --git a/crates/sui-types/src/messages.rs b/crates/sui-types/src/messages.rs index 6f143ffe34faf..bc4cf144f59c4 100644 --- a/crates/sui-types/src/messages.rs +++ b/crates/sui-types/src/messages.rs @@ -32,6 +32,7 @@ use std::{ collections::{BTreeSet, HashSet}, hash::{Hash, Hasher}, }; + #[cfg(test)] #[path = "unit_tests/messages_tests.rs"] mod messages_tests; @@ -1344,3 +1345,24 @@ impl ConsensusTransaction { } } } + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ExecuteTransactionRequestType { + ImmediateReturn, + WaitForTxCert, + WaitForEffectsCert, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ExecuteTransactionRequest { + pub transaction: Transaction, + pub request_type: ExecuteTransactionRequestType, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ExecuteTransactionResponse { + ImmediateReturn, + TxCert(Box), + // TODO: Change to CertifiedTransactionEffects eventually. + EffectsCert(Box<(CertifiedTransaction, TransactionEffects)>), +} diff --git a/crates/sui/Cargo.toml b/crates/sui/Cargo.toml index 79c9ac3e3a58f..56b6867e8f6ca 100644 --- a/crates/sui/Cargo.toml +++ b/crates/sui/Cargo.toml @@ -52,6 +52,7 @@ sui-open-rpc-macros = { path = "../sui-open-rpc-macros" } sui-json = { path = "../sui-json" } sui-gateway = { path = "../sui-gateway" } sui-node = { path = "../sui-node" } +sui-quorum-driver = { path = "../sui-quorum-driver" } sui-swarm = { path = "../sui-swarm" } rustyline = "9.1.2" diff --git a/crates/sui/tests/quorum_driver_tests.rs b/crates/sui/tests/quorum_driver_tests.rs new file mode 100644 index 0000000000000..9d99bef70d192 --- /dev/null +++ b/crates/sui/tests/quorum_driver_tests.rs @@ -0,0 +1,144 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; +use sui_core::authority_aggregator::AuthorityAggregator; +use sui_core::authority_client::NetworkAuthorityClient; +use sui_node::SuiNode; +use sui_quorum_driver::QuorumDriverHandler; +use sui_types::base_types::SuiAddress; +use sui_types::messages::{ + ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse, + Transaction, +}; +use test_utils::authority::{ + spawn_test_authorities, test_authority_aggregator, test_authority_configs, +}; +use test_utils::messages::make_transfer_sui_transaction; +use test_utils::objects::test_gas_objects; + +async fn setup() -> ( + Vec, + AuthorityAggregator, + Transaction, +) { + let mut gas_objects = test_gas_objects(); + let configs = test_authority_configs(); + let handles = spawn_test_authorities(gas_objects.clone(), &configs).await; + let clients = test_authority_aggregator(&configs); + let tx = make_transfer_sui_transaction(gas_objects.pop().unwrap(), SuiAddress::default()); + (handles, clients, tx) +} + +#[tokio::test] +async fn test_execute_transaction_immediate() { + let (_handles, clients, tx) = setup().await; + let digest = *tx.digest(); + + let mut quorum_driver_handler = QuorumDriverHandler::new(clients); + let quorum_driver = quorum_driver_handler.clone_quorum_driver(); + let handle = tokio::task::spawn(async move { + let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); + assert_eq!(*cert.digest(), digest); + assert_eq!(effects.transaction_digest, digest); + }); + assert!(matches!( + quorum_driver + .execute_transaction(ExecuteTransactionRequest { + transaction: tx, + request_type: ExecuteTransactionRequestType::ImmediateReturn, + }) + .await + .unwrap(), + ExecuteTransactionResponse::ImmediateReturn + )); + + handle.await.unwrap(); +} + +#[tokio::test] +async fn test_execute_transaction_wait_for_cert() { + let (_handles, clients, tx) = setup().await; + let digest = *tx.digest(); + + let mut quorum_driver_handler = QuorumDriverHandler::new(clients); + let quorum_driver = quorum_driver_handler.clone_quorum_driver(); + let handle = tokio::task::spawn(async move { + let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); + assert_eq!(*cert.digest(), digest); + assert_eq!(effects.transaction_digest, digest); + }); + if let ExecuteTransactionResponse::TxCert(cert) = quorum_driver + .execute_transaction(ExecuteTransactionRequest { + transaction: tx, + request_type: ExecuteTransactionRequestType::WaitForTxCert, + }) + .await + .unwrap() + { + assert_eq!(*cert.digest(), digest); + } else { + unreachable!(); + } + + handle.await.unwrap(); +} + +#[tokio::test] +async fn test_execute_transaction_wait_for_effects() { + let (_handles, clients, tx) = setup().await; + let digest = *tx.digest(); + + let mut quorum_driver_handler = QuorumDriverHandler::new(clients); + let quorum_driver = quorum_driver_handler.clone_quorum_driver(); + let handle = tokio::task::spawn(async move { + let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); + assert_eq!(*cert.digest(), digest); + assert_eq!(effects.transaction_digest, digest); + }); + if let ExecuteTransactionResponse::EffectsCert(result) = quorum_driver + .execute_transaction(ExecuteTransactionRequest { + transaction: tx, + request_type: ExecuteTransactionRequestType::WaitForEffectsCert, + }) + .await + .unwrap() + { + let (cert, effects) = *result; + assert_eq!(*cert.digest(), digest); + assert_eq!(effects.transaction_digest, digest); + } else { + unreachable!(); + } + + handle.await.unwrap(); +} + +#[tokio::test] +async fn test_update_validators() { + let (_handles, mut clients, tx) = setup().await; + let quorum_driver_handler = QuorumDriverHandler::new(clients.clone()); + let quorum_driver = quorum_driver_handler.clone_quorum_driver(); + let handle = tokio::task::spawn(async move { + // Wait till the epoch/committee is updated. + tokio::time::sleep(Duration::from_secs(3)).await; + + let result = quorum_driver + .execute_transaction(ExecuteTransactionRequest { + transaction: tx, + request_type: ExecuteTransactionRequestType::WaitForEffectsCert, + }) + .await; + // This now will fail due to epoch mismatch. + assert!(result.is_err()); + }); + + // Create a new authority aggregator with a new epoch number, and update the quorum driver. + clients.committee.epoch = 10; + quorum_driver_handler + .update_validators(clients) + .await + .unwrap(); + + handle.await.unwrap(); +} diff --git a/crates/test-utils/src/messages.rs b/crates/test-utils/src/messages.rs index d31dfaf731205..e40cdf6185fbc 100644 --- a/crates/test-utils/src/messages.rs +++ b/crates/test-utils/src/messages.rs @@ -133,6 +133,19 @@ pub fn create_publish_move_package_transaction(gas_object: Object, path: PathBuf Transaction::new(data, signature) } +pub fn make_transfer_sui_transaction(gas_object: Object, recipient: SuiAddress) -> Transaction { + let (sender, keypair) = test_keys().pop().unwrap(); + let data = TransactionData::new_transfer_sui( + recipient, + sender, + None, + gas_object.compute_object_reference(), + MAX_GAS, + ); + let signature = Signature::new(&data, &keypair); + Transaction::new(data, signature) +} + /// Make a transaction calling a specific move module & function. pub fn move_transaction( gas_object: Object,