Skip to content

Commit

Permalink
Sui-Narwhal shared objects test (MystenLabs#1531)
Browse files Browse the repository at this point in the history
* add test utilities
* Simple test for shared objects
  • Loading branch information
asonnino authored Apr 26, 2022
1 parent ec5ecd1 commit f765ca6
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 28 deletions.
3 changes: 1 addition & 2 deletions network_utils/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,12 @@ where
let stream;

tokio::select! {
_ = &mut exit_future => { break },
_ = &mut exit_future => break,
result = listener.accept() => {
let (value, _addr) = result?;
stream = value;
}
}

let guarded_state = state.clone();
tokio::spawn(async move {
let framed = TcpDataStream::from_tcp_stream(stream, _buffer_size);
Expand Down
4 changes: 4 additions & 0 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ jsonrpsee-proc-macros = "0.10.1"

[dev-dependencies]
tracing-test = "0.2.1"
tokio-util = { version = "0.7.0", features = ["codec"] }

test_utils = { path = "../test_utils" }
sui-network = { path = "../network_utils" }

[features]
benchmark = ["narwhal-node/benchmark"]
15 changes: 14 additions & 1 deletion sui/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const AUTHORITIES_DB_NAME: &str = "authorities_db";
pub const DEFAULT_STARTING_PORT: u16 = 10000;
pub const CONSENSUS_DB_NAME: &str = "consensus_db";

static PORT_ALLOCATOR: Lazy<Mutex<PortAllocator>> =
pub static PORT_ALLOCATOR: Lazy<Mutex<PortAllocator>> =
Lazy::new(|| Mutex::new(PortAllocator::new(DEFAULT_STARTING_PORT)));

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand All @@ -56,6 +56,19 @@ pub struct AuthorityPrivateInfo {
pub consensus_address: SocketAddr,
}

impl Clone for AuthorityPrivateInfo {
fn clone(&self) -> Self {
Self {
key_pair: self.key_pair.copy(),
host: self.host.clone(),
port: self.port,
db_path: self.db_path.clone(),
stake: self.stake,
consensus_address: self.consensus_address,
}
}
}

// Custom deserializer with optional default fields
impl<'de> Deserialize<'de> for AuthorityPrivateInfo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
Expand Down
2 changes: 1 addition & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ async fn make_server_with_genesis_ctx(

/// Spawn all the subsystems run by a Sui authority: a consensus node, a sui authority server,
/// and a consensus listener bridging the consensus node and the sui authority.
async fn make_authority(
pub async fn make_authority(
authority: &AuthorityPrivateInfo,
buffer_size: usize,
state: AuthorityState,
Expand Down
68 changes: 68 additions & 0 deletions sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use bytes::Bytes;
use futures::{sink::SinkExt, stream::StreamExt};
use sui::config::AuthorityPrivateInfo;
use sui_types::error::SuiError;
use sui_types::messages::ConsensusTransaction;
use sui_types::serialize::SerializedMessage;
use sui_types::serialize::{deserialize_message, serialize_consensus_transaction};
use test_utils::authority::{spawn_test_authorities, test_authority_configs};
use test_utils::messages::test_shared_object_certificates;
use test_utils::objects::{test_gas_objects, test_shared_object};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tokio_util::codec::LengthDelimitedCodec;

/// Submits a transaction to a Sui authority.
async fn submit_transaction(
transaction: Bytes,
config: &AuthorityPrivateInfo,
) -> SerializedMessage {
let authority_address = format!("{}:{}", config.host, config.port);
let stream = TcpStream::connect(authority_address).await.unwrap();
let mut connection = Framed::new(stream, LengthDelimitedCodec::new());

connection.send(transaction).await.unwrap();
let bytes = connection.next().await.unwrap().unwrap();
deserialize_message(&bytes[..]).unwrap()
}

#[tokio::test]
async fn shared_object_transaction() {
let mut objects = test_gas_objects();
objects.push(test_shared_object());

// Get the authority configs and spawn them. Note that it is important to not drop
// the handles (or the authorities will stop).
let configs = test_authority_configs();
let _handles = spawn_test_authorities(objects, &configs).await;

// Make a test shared object certificate.
let certificate = test_shared_object_certificates().await.pop().unwrap();
let message = ConsensusTransaction::UserTransaction(certificate);
let serialized = Bytes::from(serialize_consensus_transaction(&message));

// Keep submitting the certificate until it is sequenced by consensus. We use the loop
// since some consensus protocols (like Tusk) are not guaranteed to include the transaction
// (but it has high probability to do so).
tokio::task::yield_now().await;
'main: loop {
for config in &configs {
match submit_transaction(serialized.clone(), config).await {
SerializedMessage::TransactionResp(_) => {
// We got a reply from the Sui authority.
break 'main;
}
SerializedMessage::Error(error) => match *error {
SuiError::ConsensusConnectionBroken(_) => {
// This is the (confusing) error message returned by the consensus adapter
// timed out and didn't hear back from consensus.
}
error => panic!("Unexpected error {error}"),
},
message => panic!("Unexpected protocol message {message:?}"),
}
}
}
}
1 change: 1 addition & 0 deletions sui_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ signature = "1.5.0"
ed25519-dalek = "1.0.1"
scopeguard = "1.1.0"
clap = { version = "3.1.8", features = ["derive"] }
bincode = "1.3.3"
fdlimit = "0.2.1"

sui-adapter = { path = "../sui_programmability/adapter" }
Expand Down
5 changes: 3 additions & 2 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,15 +813,16 @@ impl ModuleResolver for AuthorityState {

#[async_trait]
impl ExecutionState for AuthorityState {
type Transaction = CertifiedTransaction;
type Transaction = ConsensusTransaction;
type Error = SuiError;

async fn handle_consensus_transaction(
&self,
execution_indices: ExecutionIndices,
transaction: Self::Transaction,
) -> Result<(), Self::Error> {
self.handle_consensus_certificate(transaction, execution_indices)
let ConsensusTransaction::UserTransaction(certificate) = transaction;
self.handle_consensus_certificate(certificate, execution_indices)
.await
}

Expand Down
1 change: 1 addition & 0 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl AuthorityServer {
buffer_size,
state.committee.clone(),
tx_consensus_listener,
/* max_delay */ Duration::from_millis(1_000),
);
Self {
server: NetworkServer::new(base_address, base_port, buffer_size),
Expand Down
34 changes: 21 additions & 13 deletions sui_core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use sui_network::transport::{RwChannel, TcpDataStream};
use sui_types::committee::Committee;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::ConsensusTransaction;
use sui_types::serialize::serialize_consensus_transaction;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::{timeout, Duration};
use tracing::debug;

#[cfg(test)]
Expand Down Expand Up @@ -115,6 +115,8 @@ pub struct ConsensusSubmitter {
committee: Committee,
/// A channel to notify the consensus listener of new transactions.
tx_consensus_listener: Sender<ConsensusInput>,
/// The maximum duration to wait from consensus before aborting the transaction.
max_delay: Duration,
}

impl ConsensusSubmitter {
Expand All @@ -124,12 +126,14 @@ impl ConsensusSubmitter {
buffer_size: usize,
committee: Committee,
tx_consensus_listener: Sender<ConsensusInput>,
max_delay: Duration,
) -> Self {
Self {
consensus_address,
buffer_size,
committee,
tx_consensus_listener,
max_delay,
}
}

Expand All @@ -145,17 +149,11 @@ impl ConsensusSubmitter {
// Check the Sui certificate (submitted by the user).
certificate.check(&self.committee)?;

// Send certificate to consensus
let serialized = serialize_consensus_transaction(certificate);
// Serialize the certificate in a way that is understandable to consensus (i.e., using
// bincode) and it certificate to consensus.
//let serialized = serialize_consensus_transaction(certificate);
let serialized = bincode::serialize(certificate).expect("Failed to serialize Consensus Tx");
let bytes = Bytes::from(serialized.clone());
// TODO [issue #1452]: We are re-creating a connection every time. This is wasteful but does not
// require to take self as a mutable reference.
Self::reconnect(self.consensus_address, self.buffer_size)
.await?
.sink()
.send(bytes.clone())
.await
.map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?;

// Notify the consensus listener that we are expecting to process this certificate.
let (sender, receiver) = oneshot::channel();
Expand All @@ -168,9 +166,19 @@ impl ConsensusSubmitter {
.await
.expect("Failed to notify consensus listener");

// TODO [issue #1452]: We are re-creating a connection every time. This is wasteful but does not
// require to take self as a mutable reference.
Self::reconnect(self.consensus_address, self.buffer_size)
.await?
.sink()
.send(bytes)
.await
.map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?;

// Wait for the consensus to sequence the certificate and assign locks to shared objects.
receiver
timeout(self.max_delay, receiver)
.await
.expect("Failed to receive reply from consensus listener")
.map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?
.expect("Chanel with consensus listener dropped")
}
}
10 changes: 6 additions & 4 deletions sui_core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use sui_types::messages::{
CallArg, CertifiedTransaction, SignatureAggregator, Transaction, TransactionData,
};
use sui_types::object::{MoveObject, Object, Owner};
use sui_types::serialize::{deserialize_message, SerializedMessage};
use test_utils::network::test_listener;
use test_utils::test_keys;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -135,13 +134,15 @@ async fn submit_transaction_to_consensus() {
objects.push(test_shared_object());
let authority = init_state_with_objects(objects).await;
let certificate = test_certificates(&authority).await.pop().unwrap();
let expected_transaction = certificate.transaction.clone();

// Make a new consensus submitter instance.
let submitter = ConsensusSubmitter::new(
consensus_address,
NETWORK_BUFFER_SIZE,
authority.committee,
tx_consensus_listener,
/* max_delay */ Duration::from_millis(1_000),
);

// Spawn a network listener to receive the transaction (emulating the consensus node).
Expand All @@ -161,8 +162,9 @@ async fn submit_transaction_to_consensus() {

// Ensure the consensus node got the transaction.
let bytes = handle.await.unwrap();
match deserialize_message(&bytes[..]).unwrap() {
SerializedMessage::ConsensusTransaction(..) => (),
_ => panic!("Unexpected protocol message"),
match bincode::deserialize(&bytes).unwrap() {
ConsensusTransaction::UserTransaction(x) => {
assert_eq!(x.transaction, expected_transaction)
}
}
}
9 changes: 8 additions & 1 deletion test_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ async-trait = "0.1.52"
rand = "0.7.3"
rocksdb = "0.18.0"
tracing = { version = "0.1.31", features = ["log"] }
tempfile = "3.3.0"
bcs = "0.1.3"

sui-adapter = { path = "../sui_programmability/adapter" }
move-core-types = { git = "https://github.com/move-language/move", rev = "6a80792ecbf16d74bf1d57e48a576377f0879646", features = ["address20"] }
typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev ="e44bca4513a6ff6c97399cd79e82e4bc00571ac3"}
narwhal-config = { git = "https://github.com/MystenLabs/narwhal", rev = "8ae2164f0510349cbac2770e50e853bce5ab0e02", package = "config" }

sui-types = { path = "../sui_types" }
sui-network = { path = "../network_utils" }
sui_core = { path = "../sui_core" }
sui-network = { path = "../network_utils" }
sui = { path = "../sui" }
Loading

0 comments on commit f765ca6

Please sign in to comment.