Skip to content

Commit

Permalink
Multi-process shared-objects execution (MystenLabs#2059)
Browse files Browse the repository at this point in the history
* Multi-process shared-object execution
* Fix narwhal configs for tests
  • Loading branch information
asonnino authored May 19, 2022
1 parent 219a402 commit ca9984c
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 98 deletions.
6 changes: 3 additions & 3 deletions crates/sui-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct ValidatorConfig {
network_address: Multiaddr,
metrics_address: Multiaddr,

consensus_config: ConsensuseConfig,
pub consensus_config: ConsensuseConfig,
committee_config: CommitteeConfig,

genesis: genesis::Genesis,
Expand Down Expand Up @@ -85,7 +85,7 @@ pub struct ConsensuseConfig {
//TODO make narwhal config serializable
#[serde(skip_serializing)]
#[serde(default)]
narwhal_config: DebugIgnore<ConsensusParameters>,
pub narwhal_config: DebugIgnore<ConsensusParameters>,
}

impl ConsensuseConfig {
Expand Down Expand Up @@ -164,7 +164,7 @@ impl ValidatorInfo {
/// all validators
#[derive(Debug, Deserialize, Serialize)]
pub struct NetworkConfig {
validator_configs: Vec<ValidatorConfig>,
pub validator_configs: Vec<ValidatorConfig>,
loaded_move_packages: Vec<(PathBuf, ObjectID)>,
genesis: genesis::Genesis,
pub account_keys: Vec<KeyPair>,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub enum SuiError {
#[error("Consensus listener is out of capacity")]
ListenerCapacityExceeded,
#[error("Failed to serialize/deserialize Narwhal message: {0}")]
ConsensusNarwhalSerializationError(String),
ConsensusSuiSerializationError(String),
#[error("Only shared object transactions need to be sequenced")]
NotASharedObjectTransaction,

Expand Down
85 changes: 55 additions & 30 deletions sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_core::{
use sui_types::object::OBJECT_START_VERSION;
use sui_types::{
base_types::ObjectRef,
error::{SuiError, SuiResult},
error::SuiResult,
messages::{
CallArg, ConfirmationTransaction, ConsensusTransaction, ExecutionStatus, Transaction,
TransactionInfoResponse,
Expand Down Expand Up @@ -72,8 +72,14 @@ async fn submit_shared_object_transaction(
let replies: Vec<_> = futures::future::join_all(futures)
.await
.into_iter()
// Remove all `ConsensusConnectionBroken` replies.
.filter(|result| !matches!(result, Err(SuiError::ConsensusConnectionBroken(..))))
// Remove all `ConsensusConnectionBroken` replies. Note that the original Sui error type
// `SuiError::ConsensusConnectionBroken(..)` is lost when the message is sent through the
// network (it is replaced by `RpcError`). As a result, the following filter doesn't work:
// `.filter(|result| !matches!(result, Err(SuiError::ConsensusConnectionBroken(..))))`.
.filter(|result| match result {
Err(e) => !e.to_string().contains("deadline has elapsed"),
_ => true,
})
.collect();

if !replies.is_empty() {
Expand Down Expand Up @@ -482,6 +488,7 @@ async fn replay_shared_object_transaction() {
}

#[tokio::test]
//#[ignore] // cargo test gateway -p sui --test shared_objects_tests -- --nocapture
async fn shared_object_on_gateway() {
let mut gas_objects = test_gas_objects();

Expand Down Expand Up @@ -516,27 +523,38 @@ async fn shared_object_on_gateway() {
// We need to have one gas object left for the final value check.
let last_gas_object = gas_objects.pop().unwrap();
let increment_amount = gas_objects.len();
let futures: Vec<_> = gas_objects
.into_iter()
.map(|gas_object| {
let g = gateway.clone();
let increment_counter_transaction = move_transaction(
gas_object,
"Counter",
"increment",
package_ref,
/* arguments */ vec![CallArg::SharedObject(shared_object_id)],
);
async move { g.execute_transaction(increment_counter_transaction).await }
})
.collect();

let replies: Vec<_> = futures::future::join_all(futures)
.await
.into_iter()
.collect();
assert_eq!(replies.len(), increment_amount);
assert!(replies.iter().all(|result| result.is_ok()));

// It may happen that no authorities manage to get their transaction sequenced by consensus
// (we may be unlucky and consensus may drop all our transactions). It would have been nice
// to only filter "timeout" errors, but the game way simply returns `anyhow::Error`, this
// will be fixed by issue #1717. Note that the gateway has an internal retry mechanism but
// it is not an infinite loop.
loop {
let futures: Vec<_> = gas_objects
.iter()
.cloned()
.map(|gas_object| {
let g = gateway.clone();
let increment_counter_transaction = move_transaction(
gas_object,
"Counter",
"increment",
package_ref,
/* arguments */ vec![CallArg::SharedObject(shared_object_id)],
);
async move { g.execute_transaction(increment_counter_transaction).await }
})
.collect();

let replies: Vec<_> = futures::future::join_all(futures)
.await
.into_iter()
.collect();
assert_eq!(replies.len(), increment_amount);
if replies.iter().all(|result| result.is_ok()) {
break;
}
}

let assert_value_transaction = move_transaction(
last_gas_object,
Expand All @@ -548,10 +566,17 @@ async fn shared_object_on_gateway() {
CallArg::Pure((increment_amount as u64).to_le_bytes().to_vec()),
],
);
let resp = gateway
.execute_transaction(assert_value_transaction)
.await
.unwrap();
let effects = resp.to_effect_response().unwrap().effects;
assert!(effects.status.is_ok());

// Same problem may happen here (consensus may drop transactions).
loop {
let result = gateway
.clone()
.execute_transaction(assert_value_transaction.clone())
.await;
if let Ok(response) = result {
let effects = response.to_effect_response().unwrap().effects;
assert!(effects.status.is_ok());
break;
}
}
}
83 changes: 49 additions & 34 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,13 @@ impl AuthorityState {
&self,
certificate: CertifiedTransaction,
) -> Result<Option<TransactionInfoResponse>, SuiError> {
// Ensure it is a shared object certificate
// Ensure the input is a shared object certificate
fp_ensure!(
certificate.contains_shared_object(),
SuiError::NotASharedObjectTransaction
);

// If we already executed this transaction, return the sign effects.
// If we already executed this transaction, return the signed effects.
let digest = certificate.digest();
if self.database.effects_exists(digest)? {
debug!("Shared-object transaction {digest:?} already executed");
Expand All @@ -511,20 +511,18 @@ impl AuthorityState {
// If we already assigned locks to this transaction, we can try to execute it immediately.
// This can happen to transaction previously submitted to consensus that failed execution
// due to missing dependencies.
match self
.database
.sequenced(digest, certificate.shared_input_objects())?[0]
{
Some(_) => {
// Attempt to execute the transaction. This will only succeed if the authority
// already executed all its dependencies.
let confirmation_transaction = ConfirmationTransaction { certificate };
self.handle_confirmation_transaction(confirmation_transaction.clone())
.await
.map(Some)
}
None => Ok(None),
if self.shared_locks_exist(&certificate).await? {
// Attempt to execute the transaction. This will only succeed if the authority
// already executed all its dependencies and if the locks are correctly attributed to
// the transaction (ie. this transaction is the next to be executed).
debug!("Shared-locks already assigned to {digest:?} - executing now");
let confirmation = ConfirmationTransaction { certificate };
return self.process_certificate(confirmation).await.map(Some);
}

// If we didn't already attributed shared locks to this transaction, it needs to go
// through consensus.
Ok(None)
}

pub async fn handle_transaction_info_request(
Expand Down Expand Up @@ -975,6 +973,14 @@ impl AuthorityState {
// implicitly we drop the ticket here and that notifies the batch manager
}

/// Check whether a shared-object certificate has already been given shared-locks.
async fn shared_locks_exist(&self, certificate: &CertifiedTransaction) -> SuiResult<bool> {
let digest = certificate.digest();
let shared_inputs = certificate.shared_input_objects();
let shared_locks = self.database.sequenced(digest, shared_inputs)?;
Ok(shared_locks[0].is_some())
}

/// Get a read reference to an object/seq lock
pub async fn get_transaction_lock(
&self,
Expand Down Expand Up @@ -1046,31 +1052,40 @@ impl ExecutionState for AuthorityState {
) -> Result<Vec<u8>, Self::Error> {
let ConsensusTransaction::UserTransaction(certificate) = transaction;

// Ensure an idempotent answer.
// Ensure the input is a shared object certificate. Remember that Byzantine authorities
// may input anything into consensus.
fp_ensure!(
certificate.contains_shared_object(),
SuiError::NotASharedObjectTransaction
);

// If we already executed this transaction, return the signed effects.
let digest = certificate.digest();
if self.database.effects_exists(digest)? {
let info = self.make_transaction_info(digest).await?;
debug!(tx_digest =? digest, "Shared-object transaction already executed");
return Ok(bincode::serialize(&info).unwrap());
let info = self.make_transaction_info(digest).await?;
return Ok(bincode::serialize(&info).expect("Failed to serialize tx info"));
}

// Assign locks to shared objects.
self.handle_consensus_certificate(certificate.clone(), execution_indices)
.await?;
debug!(tx_digest =? digest, "Shared objects locks successfully attributed");

// Attempt to execute the transaction. This will only succeed if the authority
// already executed all its dependencies.
let confirmation_transaction = ConfirmationTransaction {
certificate: certificate.clone(),
};
let info = self
.handle_confirmation_transaction(confirmation_transaction.clone())
.await?;
debug!(tx_digest =? digest, "Executed consensus transaction");
// If we didn't already assigned shared-locks to this transaction, we do it now.
if !self.shared_locks_exist(&certificate).await? {
// Check the certificate. Remember that Byzantine authorities may input anything into
// consensus.
certificate.verify(&self.committee)?;

// Persist the certificate since we are about to lock one or more shared object.
// We thus need to make sure someone (if not the client) can continue the protocol.
// Also atomically lock the shared objects for this particular transaction and
// increment the last consensus index. Note that a single process can ever call
// this function and that the last consensus index is also kept in memory. It is
// thus ok to only persist now (despite this function may have returned earlier).
// In the worst case, the synchronizer of the consensus client will catch up.
self.database
.persist_certificate_and_lock_shared_objects(certificate, execution_indices)?;
}

// Return a serialized transaction info response. This will be sent back to the client.
Ok(bincode::serialize(&info).unwrap())
// TODO: This return time is not ideal.
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ where
}

/// This function takes an initial state, than executes an asynchronous function (FMap) for each
/// uthority, and folds the results as they become available into the state using an async function (FReduce).
/// authority, and folds the results as they become available into the state using an async function (FReduce).
///
/// FMap can do io, and returns a result V. An error there may not be fatal, and could be consumed by the
/// MReduce function to overall recover from it. This is necessary to ensure byzantine authorities cannot
Expand Down
1 change: 1 addition & 0 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl AuthorityServer {
tx_consensus_listener: Sender<ConsensusListenerMessage>,
) -> Self {
let consensus_adapter = ConsensusAdapter::new(
state.clone(),
consensus_address,
state.committee.clone(),
tx_consensus_listener,
Expand Down
38 changes: 26 additions & 12 deletions sui_core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::authority::AuthorityState;
use bytes::Bytes;
use multiaddr::Multiaddr;
use narwhal_executor::SubscriberResult;
use narwhal_types::TransactionProto;
use narwhal_types::TransactionsClient;
use std::sync::Arc;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};
use sui_types::messages::ConfirmationTransaction;
use sui_types::{
committee::Committee,
error::{SuiError, SuiResult},
Expand Down Expand Up @@ -57,8 +60,9 @@ type ConsensusOutput = (

/// Submit Sui certificates to the consensus.
pub struct ConsensusAdapter {
/// The network address of the consensus node.
_consensus_address: Multiaddr,
/// The authority's state.
state: Arc<AuthorityState>,
/// The network client connecting to the consensus node of this authority.
consensus_client: TransactionsClient<sui_network::tonic::transport::Channel>,
/// The Sui committee information.
committee: Committee,
Expand All @@ -71,18 +75,20 @@ pub struct ConsensusAdapter {
}

impl ConsensusAdapter {
/// Make a new Consensus submitter instance.
/// Make a new Consensus adapter instance.
pub fn new(
state: Arc<AuthorityState>,
consensus_address: Multiaddr,
committee: Committee,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
max_delay: Duration,
) -> Self {
let consensus_client = TransactionsClient::new(
mysten_network::client::connect_lazy(&consensus_address).unwrap(),
mysten_network::client::connect_lazy(&consensus_address)
.expect("Failed to connect to consensus"),
);
Self {
_consensus_address: consensus_address,
state,
consensus_client,
committee,
tx_consensus_listener,
Expand Down Expand Up @@ -131,7 +137,7 @@ impl ConsensusAdapter {
// certificate will be sequenced. So the best we can do is to set a timer and notify the
// client to retry if we timeout without hearing back from consensus (this module does not
// handle retries). The best timeout value depends on the consensus protocol.
let resp = match timeout(self.max_delay, receiver).await {
let info = match timeout(self.max_delay, receiver).await {
Ok(reply) => reply.expect("Failed to read back from consensus listener"),
Err(e) => {
let message = ConsensusListenerMessage::Cleanup(serialized);
Expand All @@ -141,12 +147,20 @@ impl ConsensusAdapter {
.expect("Cleanup channel with consensus listener dropped");
Err(SuiError::ConsensusConnectionBroken(e.to_string()))
}
};

resp.and_then(|r| {
bincode::deserialize(&r)
.map_err(|e| SuiError::ConsensusNarwhalSerializationError(e.to_string()))
})
}?;

if info.is_empty() {
// Consensus successfully assigned shared-locks to this certificate.
let ConsensusTransaction::UserTransaction(certificate) = certificate.clone();
let confirmation_transaction = ConfirmationTransaction { certificate };
self.state
.handle_confirmation_transaction(confirmation_transaction)
.await
} else {
// This certificate has already been executed.
bincode::deserialize(&info)
.map_err(|e| SuiError::ConsensusSuiSerializationError(e.to_string()))
}
}
}

Expand Down
Loading

0 comments on commit ca9984c

Please sign in to comment.