Skip to content

Commit

Permalink
Add QuorumDriver (MystenLabs#2515)
Browse files Browse the repository at this point in the history
* Add QuorumDriver

* rebase

* Remove timeout completely

* Fix subscription

* Address feedback

* Reorganize data structures

* Remove mutex

* More feedback

* add member
  • Loading branch information
lxfind authored Jun 14, 2022
1 parent a569987 commit e83495a
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 1 deletion.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"crates/sui-node",
"crates/sui-open-rpc",
"crates/sui-open-rpc-macros",
"crates/sui-quorum-driver",
"crates/sui-storage",
"crates/sui-swarm",
"crates/sui-transactional-test-runner",
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ where
/// At that point (and after) enough authorities are up to date with all objects
/// needed to process the certificate that a submission should succeed. However,
/// in case an authority returns an error, we do try to bring it up to speed.
async fn process_certificate(
pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> Result<TransactionEffects, SuiError> {
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ SuiError:
UnsupportedFeatureError:
STRUCT:
- error: STR
115:
QuorumDriverCommunicationError:
STRUCT:
- error: STR
TransactionDigest:
NEWTYPESTRUCT: BYTES
TransactionEffectsDigest:
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ sui-core = { path = "../sui-core" }
sui-storage = { path = "../sui-storage" }
sui-gateway = { path = "../sui-gateway" }
sui-network = { path = "../sui-network" }
sui-quorum-driver = { path = "../sui-quorum-driver" }

telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" }
mysten-network = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" }
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-quorum-driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "sui-quorum-driver"
version = "0.1.0"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
arc-swap = "1.5.0"
tokio = { version = "1.18.2", features = ["full"] }
tracing = "0.1.34"

sui-core = { path = "../sui-core" }
sui-types = { path = "../sui-types" }

workspace-hack = { path = "../workspace-hack"}
224 changes: 224 additions & 0 deletions crates/sui-quorum-driver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use arc_swap::ArcSwap;
use std::sync::Arc;

use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::log::{error, warn};
use tracing::Instrument;

use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::AuthorityAPI;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
CertifiedTransaction, ExecuteTransactionRequest, ExecuteTransactionRequestType,
ExecuteTransactionResponse, Transaction, TransactionEffects,
};

pub enum QuorumTask<A> {
ProcessTransaction(Transaction),
ProcessCertificate(CertifiedTransaction),
UpdateCommittee(AuthorityAggregator<A>),
}

/// A handler to wrap around QuorumDriver. This handler should be owned by the node with exclusive
/// mutability.
pub struct QuorumDriverHandler<A> {
quorum_driver: Arc<QuorumDriver<A>>,
_processor_handle: JoinHandle<()>,
// TODO: Change to CertifiedTransactionEffects eventually.
effects_subscriber: Receiver<(CertifiedTransaction, TransactionEffects)>,
}

/// The core data structure of the QuorumDriver.
/// It's expected that the QuorumDriver will be wrapped in an `Arc` and shared around.
/// One copy will be used in a json-RPC server to serve transaction execution requests;
/// Another copy will be held by a QuorumDriverHandler to either send signal to update the
/// committee, or to subscribe effects generated from the QuorumDriver.
pub struct QuorumDriver<A> {
validators: ArcSwap<AuthorityAggregator<A>>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
}

impl<A> QuorumDriver<A> {
pub fn new(
validators: AuthorityAggregator<A>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
) -> Self {
Self {
validators: ArcSwap::from(Arc::new(validators)),
task_sender,
effects_subscribe_sender,
}
}
}

impl<A> QuorumDriver<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn execute_transaction(
&self,
request: ExecuteTransactionRequest,
) -> SuiResult<ExecuteTransactionResponse> {
let ExecuteTransactionRequest {
transaction,
request_type,
} = request;
match request_type {
ExecuteTransactionRequestType::ImmediateReturn => {
self.task_sender
.send(QuorumTask::ProcessTransaction(transaction))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})?;
Ok(ExecuteTransactionResponse::ImmediateReturn)
}
ExecuteTransactionRequestType::WaitForTxCert => {
let certificate = self
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await?;
self.task_sender
.send(QuorumTask::ProcessCertificate(certificate.clone()))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})?;
Ok(ExecuteTransactionResponse::TxCert(Box::new(certificate)))
}
ExecuteTransactionRequestType::WaitForEffectsCert => {
let certificate = self
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await?;
let response = self
.process_certificate(certificate)
.instrument(tracing::debug_span!("process_cert"))
.await?;
Ok(ExecuteTransactionResponse::EffectsCert(Box::new(response)))
}
}
}

pub async fn process_transaction(
&self,
transaction: Transaction,
) -> SuiResult<CertifiedTransaction> {
self.validators
.load()
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await
}

pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<(CertifiedTransaction, TransactionEffects)> {
let effects = self
.validators
.load()
.process_certificate(certificate.clone())
.instrument(tracing::debug_span!("process_cert"))
.await?;
let response = (certificate, effects);
// An error to send the result to subscribers should not block returning the result.
if let Err(err) = self.effects_subscribe_sender.send(response.clone()).await {
// TODO: We could potentially retry sending if we want.
error!("{}", err);
}
Ok(response)
}
}

impl<A> QuorumDriverHandler<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(validators: AuthorityAggregator<A>) -> Self {
let (task_tx, task_rx) = mpsc::channel::<QuorumTask<A>>(5000);
let (subscriber_tx, subscriber_rx) = mpsc::channel::<_>(5000);
let quorum_driver = Arc::new(QuorumDriver::new(validators, task_tx, subscriber_tx));
let handle = {
let quorum_driver_copy = quorum_driver.clone();
tokio::task::spawn(async move {
Self::task_queue_processor(quorum_driver_copy, task_rx).await;
})
};
Self {
quorum_driver,
_processor_handle: handle,
effects_subscriber: subscriber_rx,
}
}

pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
self.quorum_driver.clone()
}

pub fn subscribe(&mut self) -> &mut Receiver<(CertifiedTransaction, TransactionEffects)> {
&mut self.effects_subscriber
}

pub async fn update_validators(&self, new_validators: AuthorityAggregator<A>) -> SuiResult {
self.quorum_driver
.task_sender
.send(QuorumTask::UpdateCommittee(new_validators))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})
}

async fn task_queue_processor(
quorum_driver: Arc<QuorumDriver<A>>,
mut task_receiver: Receiver<QuorumTask<A>>,
) {
loop {
if let Some(task) = task_receiver.recv().await {
match task {
QuorumTask::ProcessTransaction(transaction) => {
// TODO: We entered here because callers do not want to wait for a
// transaction to finish execution. When this failed, we do not have a
// way to notify the caller. In the future, we may want to maintain
// some data structure for callers to come back and query the status
// of a transaction latter.
match quorum_driver.process_transaction(transaction).await {
Ok(cert) => {
if let Err(err) = quorum_driver
.task_sender
.send(QuorumTask::ProcessCertificate(cert))
.await
{
error!(
"Sending task to quorum driver queue failed: {}",
err.to_string()
);
}
}
Err(err) => {
warn!("Transaction processing failed: {:?}", err);
}
}
}
QuorumTask::ProcessCertificate(certificate) => {
// TODO: Similar to ProcessTransaction, we may want to allow callers to
// query the status.
if let Err(err) = quorum_driver.process_certificate(certificate).await {
warn!("Certificate processing failed: {:?}", err);
}
}
QuorumTask::UpdateCommittee(new_validators) => {
quorum_driver.validators.store(Arc::new(new_validators));
}
}
}
}
}
}
3 changes: 3 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ pub enum SuiError {

#[error("Use of disabled feature: {:?}", error)]
UnsupportedFeatureError { error: String },

#[error("Unable to communicate with the Quorum Driver channel: {:?}", error)]
QuorumDriverCommunicationError { error: String },
}

pub type SuiResult<T = ()> = Result<T, SuiError>;
Expand Down
22 changes: 22 additions & 0 deletions crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::{
collections::{BTreeSet, HashSet},
hash::{Hash, Hasher},
};

#[cfg(test)]
#[path = "unit_tests/messages_tests.rs"]
mod messages_tests;
Expand Down Expand Up @@ -1344,3 +1345,24 @@ impl ConsensusTransaction {
}
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ExecuteTransactionRequestType {
ImmediateReturn,
WaitForTxCert,
WaitForEffectsCert,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ExecuteTransactionRequest {
pub transaction: Transaction,
pub request_type: ExecuteTransactionRequestType,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ExecuteTransactionResponse {
ImmediateReturn,
TxCert(Box<CertifiedTransaction>),
// TODO: Change to CertifiedTransactionEffects eventually.
EffectsCert(Box<(CertifiedTransaction, TransactionEffects)>),
}
1 change: 1 addition & 0 deletions crates/sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sui-open-rpc-macros = { path = "../sui-open-rpc-macros" }
sui-json = { path = "../sui-json" }
sui-gateway = { path = "../sui-gateway" }
sui-node = { path = "../sui-node" }
sui-quorum-driver = { path = "../sui-quorum-driver" }
sui-swarm = { path = "../sui-swarm" }

rustyline = "9.1.2"
Expand Down
Loading

0 comments on commit e83495a

Please sign in to comment.