Skip to content

Commit

Permalink
Direct execution (MystenLabs#1656)
Browse files Browse the repository at this point in the history
Execute all consensus txs
  • Loading branch information
asonnino authored Apr 29, 2022
1 parent ca72f7c commit b92eef1
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 149 deletions.
19 changes: 10 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ move-binary-format = { git = "https://github.com/move-language/move", rev = "4e0
move-bytecode-utils = { git = "https://github.com/move-language/move", rev = "4e025186af502c931318884df53c11bf34a664bc" }
move-unit-test = { git = "https://github.com/move-language/move", rev = "4e025186af502c931318884df53c11bf34a664bc" }

narwhal-node = { git = "https://github.com/MystenLabs/narwhal", rev = "8ae2164f0510349cbac2770e50e853bce5ab0e02", package = "node" }
narwhal-config = { git = "https://github.com/MystenLabs/narwhal", rev = "8ae2164f0510349cbac2770e50e853bce5ab0e02", package = "config" }
narwhal-crypto = { git = "https://github.com/MystenLabs/narwhal", rev = "8ae2164f0510349cbac2770e50e853bce5ab0e02", package = "crypto" }
narwhal-node = { git = "https://github.com/MystenLabs/narwhal", rev = "976355d2ff75aefc6b7ec7fd32742d3d7188c4e0", package = "node" }
narwhal-config = { git = "https://github.com/MystenLabs/narwhal", rev = "976355d2ff75aefc6b7ec7fd32742d3d7188c4e0", package = "config" }
narwhal-crypto = { git = "https://github.com/MystenLabs/narwhal", rev = "976355d2ff75aefc6b7ec7fd32742d3d7188c4e0", package = "crypto" }

once_cell = "1.10.0"

Expand Down
4 changes: 2 additions & 2 deletions sui/src/bin/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async fn main() -> Result<(), anyhow::Error> {

let consensus_committee = network_config.make_narwhal_committee();
let consensus_parameters = ConsensusParameters {
max_header_delay: 5_000,
max_batch_delay: 5_000,
max_header_delay: std::time::Duration::from_millis(5_000),
max_batch_delay: std::time::Duration::from_millis(5_000),
..ConsensusParameters::default()
};
let consensus_store_path = sui_config_dir()?
Expand Down
6 changes: 5 additions & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ pub async fn make_authority(

// Spawn a consensus listener. It listen for consensus outputs and notifies the
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);
ConsensusListener::spawn(
authority_state.clone(),
rx_sui_to_consensus,
rx_consensus_to_sui,
);

// If we have network information make authority clients
// to all authorities in the system.
Expand Down
30 changes: 14 additions & 16 deletions sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ async fn submit_single_owner_transaction(

// Keep submitting the certificate until it is sequenced by consensus. We use the loop
// since some consensus protocols (like Tusk) are not guaranteed to include the transaction
// (but it has high probability to do so).
// NOTE: This is good for testing but is not how a real client should submit transactions.
// (but it has high probability to do so, so it should virtually never be used).
async fn submit_shared_object_transaction(
transaction: Transaction,
configs: &[AuthorityPrivateInfo],
Expand All @@ -61,26 +60,25 @@ async fn submit_shared_object_transaction(
let serialized = Bytes::from(serialize_consensus_transaction(&message));

'main: loop {
for config in configs {
match transmit(serialized.clone(), config).await {
SerializedMessage::TransactionResp(reply) => {
// We got a reply from the Sui authority.
break 'main *reply;
}
SerializedMessage::Error(error) => match *error {
SuiError::ConsensusConnectionBroken(_) => {
// This is the (confusing) error message returned by the consensus
// adapter timed out and didn't hear back from consensus.
}
error => panic!("{error}"),
},
message => panic!("Unexpected protocol message {message:?}"),
match transmit(serialized.clone(), &configs[0]).await {
SerializedMessage::TransactionResp(reply) => {
// We got a reply from the Sui authority.
break 'main *reply;
}
SerializedMessage::Error(error) => match *error {
SuiError::ConsensusConnectionBroken(_) => {
// This is the (confusing) error message returned by the consensus
// adapter. It means it didn't hear back from consensus and timed out.
}
error => panic!("{error}"),
},
message => panic!("Unexpected protocol message: {message:?}"),
}
}
}

#[tokio::test]
#[ignore = "Flaky, see #1624"]
async fn shared_object_transaction() {
let mut objects = test_gas_objects();
objects.push(test_shared_object());
Expand Down
4 changes: 2 additions & 2 deletions sui_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ scopeguard = "1.1.0"
clap = { version = "3.1.8", features = ["derive"] }
bincode = "1.3.3"
fdlimit = "0.2.1"
schemars = "0.8.8"

sui-adapter = { path = "../sui_programmability/adapter" }
sui-framework = { path = "../sui_programmability/framework" }
Expand All @@ -41,8 +42,7 @@ move-vm-types = { git = "https://github.com/move-language/move", rev = "4e025186

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "d2976a45420147ad821baae96e6fe4b12215f743"}

narwhal-executor = { git = "https://github.com/MystenLabs/narwhal", rev = "8ae2164f0510349cbac2770e50e853bce5ab0e02", package = "executor" }
schemars = "0.8.8"
narwhal-executor = { git = "https://github.com/MystenLabs/narwhal", rev = "976355d2ff75aefc6b7ec7fd32742d3d7188c4e0", package = "executor" }

[dev-dependencies]
serde-reflection = "0.3.5"
Expand Down
27 changes: 9 additions & 18 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::mpsc::Sender;
use std::time::Duration;
use tracing::{error, info, warn, Instrument};

use crate::consensus_adapter::{ConsensusInput, ConsensusSubmitter};
use crate::consensus_adapter::{ConsensusAdapter, ConsensusInput};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use tokio::sync::broadcast::error::RecvError;
Expand All @@ -41,7 +41,7 @@ const MAX_DELAY_MILLIS: u64 = 5_000; // 5 sec
pub struct AuthorityServer {
server: NetworkServer,
pub state: Arc<AuthorityState>,
consensus_submitter: ConsensusSubmitter,
consensus_adapter: ConsensusAdapter,
}

impl AuthorityServer {
Expand All @@ -53,7 +53,7 @@ impl AuthorityServer {
consensus_address: SocketAddr,
tx_consensus_listener: Sender<ConsensusInput>,
) -> Self {
let consensus_submitter = ConsensusSubmitter::new(
let consensus_adapter = ConsensusAdapter::new(
consensus_address,
buffer_size,
state.committee.clone(),
Expand All @@ -63,7 +63,7 @@ impl AuthorityServer {
Self {
server: NetworkServer::new(base_address, base_port, buffer_size),
state,
consensus_submitter,
consensus_adapter,
}
}

Expand Down Expand Up @@ -256,20 +256,11 @@ impl AuthorityServer {
.handle_batch_streaming(*message, channel)
.await
.map(|_| None),
SerializedMessage::ConsensusTransaction(message) => {
match self.consensus_submitter.submit(&message).await {
Ok(()) => match *message {
ConsensusTransaction::UserTransaction(certificate) => {
let confirmation_transaction = ConfirmationTransaction { certificate };
self.state
.handle_confirmation_transaction(confirmation_transaction)
.await
.map(|info| Some(serialize_transaction_info(&info)))
}
},
Err(e) => Err(e),
}
}
SerializedMessage::ConsensusTransaction(message) => self
.consensus_adapter
.submit(&message)
.await
.map(|info| Some(serialize_transaction_info(&info))),

_ => Err(SuiError::UnexpectedMessage),
};
Expand Down
Loading

0 comments on commit b92eef1

Please sign in to comment.