Skip to content

Commit

Permalink
[Onboard quorum driver to fullnode] 3/n add cluster test case (Mysten…
Browse files Browse the repository at this point in the history
…Labs#3867)

* add execute transaction end points

* add execute txn endpoint

* add cluster test

* rebase

* get gas objs

* fmt
  • Loading branch information
longbowlu authored Aug 17, 2022
1 parent c6f835e commit a0cc1c5
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions crates/sui-cluster-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use config::ClusterTestOpt;
use std::sync::Arc;
use sui::client_commands::WalletContext;
use sui_json_rpc_types::SuiTransactionResponse;
use test_utils::messages::make_transactions_with_wallet_context;

use sui_sdk::SuiClient;
use sui_types::gas_coin::GasCoin;
Expand All @@ -17,6 +18,7 @@ use sui_types::{
};
use test_case::{
call_contract_test::CallContractTest, coin_merge_split_test::CoinMergeSplitTest,
fullnode_execute_transaction_test::FullNodeExecuteTransactionTest,
native_transfer_test::NativeTransferTest, shared_object_test::SharedCounterTest,
};
use tokio::time::{sleep, Duration};
Expand Down Expand Up @@ -73,6 +75,12 @@ impl TestContext {
self.client.get_wallet_address()
}

/// See `make_transactions_with_wallet_context` for potential caveats
/// of this helper function.
pub async fn make_transactions(&mut self, max_txn_num: usize) -> Vec<Transaction> {
make_transactions_with_wallet_context(self.get_wallet_mut(), max_txn_num).await
}

async fn sign_and_execute(
&self,
txn_data: TransactionData,
Expand Down Expand Up @@ -156,6 +164,7 @@ impl ClusterTest {
TestCase::new(CoinMergeSplitTest {}),
TestCase::new(CallContractTest {}),
TestCase::new(SharedCounterTest {}),
TestCase::new(FullNodeExecuteTransactionTest {}),
];

// TODO: improve the runner parallelism for efficiency
Expand Down
1 change: 1 addition & 0 deletions crates/sui-cluster-test/src/test_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

pub mod call_contract_test;
pub mod coin_merge_split_test;
pub mod fullnode_execute_transaction_test;
pub mod native_transfer_test;
pub mod shared_object_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{TestCaseImpl, TestContext};
use async_trait::async_trait;
use sui_json_rpc_types::{SuiExecuteTransactionResponse, SuiExecutionStatus};
use sui_types::messages::ExecuteTransactionRequestType;
use tracing::info;

pub struct FullNodeExecuteTransactionTest;

#[async_trait]
impl TestCaseImpl for FullNodeExecuteTransactionTest {
fn name(&self) -> &'static str {
"FullNodeExecuteTransaction"
}

fn description(&self) -> &'static str {
"Test executing transaction via Fullnode Quorum Driver"
}

async fn run(&self, ctx: &mut TestContext) -> Result<(), anyhow::Error> {
ctx.get_sui_from_faucet(Some(3)).await;
let mut txns = ctx.make_transactions(3).await;
assert!(
txns.len() >= 3,
"Expect at least 3 txns, but only got {}. Do we get enough gas objects from faucet?",
txns.len(),
);

let fullnode = ctx.get_fullnode();

// Test WaitForEffectsCert
let txn = txns.swap_remove(0);
let txn_digest = *txn.digest();

info!("Test execution with ImmediateReturn");
let response = fullnode
.execute_transaction_by_fullnode(
txn.clone(),
ExecuteTransactionRequestType::ImmediateReturn,
)
.await?;
if let SuiExecuteTransactionResponse::ImmediateReturn { tx_digest } = response {
assert_eq!(txn_digest, tx_digest);

// Verify fullnode observes the txn
ctx.let_fullnode_sync().await;

fullnode
.get_transaction(tx_digest)
.await
.unwrap_or_else(|e| {
panic!(
"Failed get transaction {:?} from fullnode: {:?}",
txn_digest, e
)
});
} else {
panic!("Expect ImmediateReturn but got {:?}", response);
}

info!("Test execution with WaitForTxCert");
let txn = txns.swap_remove(0);
let txn_digest = *txn.digest();
let response = fullnode
.execute_transaction_by_fullnode(
txn.clone(),
ExecuteTransactionRequestType::WaitForTxCert,
)
.await?;
if let SuiExecuteTransactionResponse::TxCert { certificate } = response {
assert_eq!(txn_digest, certificate.transaction_digest);

// Verify fullnode observes the txn
ctx.let_fullnode_sync().await;

fullnode
.get_transaction(txn_digest)
.await
.unwrap_or_else(|e| {
panic!(
"Failed get transaction {:?} from fullnode: {:?}",
txn_digest, e
)
});
} else {
panic!("Expect TxCert but got {:?}", response);
}

info!("Test execution with WaitForEffectsCert");
let txn = txns.swap_remove(0);
let txn_digest = *txn.digest();

let response = fullnode
.execute_transaction_by_fullnode(txn, ExecuteTransactionRequestType::WaitForEffectsCert)
.await?;
if let SuiExecuteTransactionResponse::EffectsCert {
certificate,
effects,
} = response
{
assert_eq!(txn_digest, certificate.transaction_digest);
if !matches!(effects.effects.status, SuiExecutionStatus::Success { .. }) {
panic!(
"Failed to execute transfer tranasction {:?}: {:?}",
txn_digest, effects.effects.status
)
}
// Verify fullnode observes the txn
ctx.let_fullnode_sync().await;

fullnode
.get_transaction(txn_digest)
.await
.unwrap_or_else(|e| {
panic!(
"Failed get transaction {:?} from fullnode: {:?}",
txn_digest, e
)
});
} else {
panic!("Expect EffectsCert but got {:?}", response);
}

Ok(())
}
}
3 changes: 1 addition & 2 deletions crates/sui-faucet/src/faucet/simple_faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl SimpleFaucet {
// Ok to unwrap() since `get_gas_objects` guarantees gas
.map(|q| GasCoin::try_from(&q.1).unwrap())
.collect::<Vec<GasCoin>>();
info!("Coins held: {:?}", coins);

let (producer, consumer) = mpsc::channel(coins.len());
for coin in &coins {
Expand All @@ -75,7 +74,7 @@ impl SimpleFaucet {
}
}

info!("Using coins: {:?}", coins);
debug!("Using coins: {:?}", coins);

let metrics = FaucetMetrics::new(prometheus_registry);

Expand Down
42 changes: 40 additions & 2 deletions crates/sui-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_core::Stream;
use jsonrpsee::core::client::Subscription;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use rpc_types::SuiExecuteTransactionResponse;
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Write;
Expand All @@ -15,6 +16,7 @@ use sui_config::gateway::GatewayConfig;
use sui_core::gateway_state::{GatewayClient, GatewayState};
use sui_json::SuiJsonValue;
use sui_json_rpc::api::EventStreamingApiClient;
use sui_json_rpc::api::QuorumDriverApiClient;
use sui_json_rpc::api::RpcBcsApiClient;
use sui_json_rpc::api::RpcFullNodeReadApiClient;
use sui_json_rpc::api::RpcGatewayApiClient;
Expand All @@ -30,6 +32,7 @@ use sui_types::base_types::{ObjectID, SuiAddress, TransactionDigest};
use sui_types::crypto::SignableBytes;
use sui_types::messages::{Transaction, TransactionData};
use sui_types::sui_serde::Base64;
use types::messages::ExecuteTransactionRequestType;

pub mod crypto;

Expand Down Expand Up @@ -224,18 +227,53 @@ impl SuiClient {
Ok(match &self {
Self::Http(c) => {
let (tx_bytes, flag, signature, pub_key) = tx.to_network_data_for_execution();
c.execute_transaction(tx_bytes, flag, signature, pub_key)
RpcGatewayApiClient::execute_transaction(c, tx_bytes, flag, signature, pub_key)
.await?
}
Self::Ws(c) => {
let (tx_bytes, flag, signature, pub_key) = tx.to_network_data_for_execution();
c.execute_transaction(tx_bytes, flag, signature, pub_key)
RpcGatewayApiClient::execute_transaction(c, tx_bytes, flag, signature, pub_key)
.await?
}
Self::Embedded(c) => c.execute_transaction(tx).await?,
})
}

pub async fn execute_transaction_by_fullnode(
&self,
tx: Transaction,
request_type: ExecuteTransactionRequestType,
) -> anyhow::Result<SuiExecuteTransactionResponse> {
Ok(match &self {
Self::Http(c) => {
let (tx_bytes, flag, signature, pub_key) = tx.to_network_data_for_execution();
QuorumDriverApiClient::execute_transaction(
c,
tx_bytes,
flag,
signature,
pub_key,
request_type,
)
.await?
}
Self::Ws(c) => {
let (tx_bytes, flag, signature, pub_key) = tx.to_network_data_for_execution();
QuorumDriverApiClient::execute_transaction(
c,
tx_bytes,
flag,
signature,
pub_key,
request_type,
)
.await?
}
// TODO do we want to support an embedded quorum driver?
Self::Embedded(_c) => unimplemented!(),
})
}

pub async fn transfer_object(
&self,
signer: SuiAddress,
Expand Down
8 changes: 4 additions & 4 deletions crates/sui/src/client_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl SuiClientCommands {
.await?
.iter()
// Ok to unwrap() since `get_gas_objects` guarantees gas
.map(|(_, object)| GasCoin::try_from(object).unwrap())
.map(|(_val, object, _object_ref)| GasCoin::try_from(object).unwrap())
.collect();
SuiClientCommandResult::Gas(coins)
}
Expand Down Expand Up @@ -561,7 +561,7 @@ impl WalletContext {
pub async fn gas_objects(
&self,
address: SuiAddress,
) -> Result<Vec<(u64, SuiParsedObject)>, anyhow::Error> {
) -> Result<Vec<(u64, SuiParsedObject, SuiObjectInfo)>, anyhow::Error> {
let object_refs = self.gateway.get_objects_owned_by_address(address).await?;

// TODO: We should ideally fetch the objects from local cache
Expand All @@ -572,7 +572,7 @@ impl WalletContext {
if matches!( o.data.type_(), Some(v) if *v == GasCoin::type_().to_string()) {
// Okay to unwrap() since we already checked type
let gas_coin = GasCoin::try_from(&o)?;
values_objects.push((gas_coin.value(), o));
values_objects.push((gas_coin.value(), o, oref));
}
}
_ => continue,
Expand Down Expand Up @@ -607,7 +607,7 @@ impl WalletContext {
) -> Result<(u64, SuiParsedObject), anyhow::Error> {
for o in self.gas_objects(address).await.unwrap() {
if o.0 >= budget && !forbidden_gas_objects.contains(&o.1.id()) {
return Ok(o);
return Ok((o.0, o.1));
}
}
Err(anyhow!(
Expand Down
24 changes: 19 additions & 5 deletions crates/test-utils/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,43 @@ pub async fn get_account_and_gas_coins(
Ok(res)
}

/// A helper function to get all accounts and their owned objects
pub async fn get_gas_objects_with_wallet_context(
context: &WalletContext,
address: SuiAddress,
) -> Vec<SuiObjectInfo> {
context
.gas_objects(address)
.await
.unwrap()
.into_iter()
.map(|(_val, _object, object_ref)| object_ref)
.collect()
}

/// A helper function to get all accounts and their owned gas objects
/// with a WalletContext.
pub async fn get_account_and_objects(
pub async fn get_account_and_gas_objects(
context: &WalletContext,
) -> Vec<(SuiAddress, Vec<SuiObjectInfo>)> {
let owned_gas_objects = futures::future::join_all(
context
.keystore
.addresses()
.iter()
.map(|account| context.gateway.get_objects_owned_by_address(*account)),
.map(|account| get_gas_objects_with_wallet_context(context, *account)),
)
.await;
context
.keystore
.addresses()
.iter()
.zip(owned_gas_objects.into_iter())
.map(|(address, objects)| (*address, objects.unwrap()))
.map(|(address, objects)| (*address, objects))
.collect::<Vec<_>>()
}

/// A helper function to make Transactions with controlled accounts in WalletContext.
/// Particularly, the wallet needs to own gas objects for transactions.
/// However, if this function is called multiple times without any "sync" actions
/// on gas object management, txns may fail and objects may be locked.
///
Expand All @@ -86,7 +100,7 @@ pub async fn make_transactions_with_wallet_context(
max_txn_num: usize,
) -> Vec<Transaction> {
let recipient = get_key_pair::<AuthorityKeyPair>().0;
let accounts_and_objs = get_account_and_objects(context).await;
let accounts_and_objs = get_account_and_gas_objects(context).await;
let mut res = Vec::with_capacity(max_txn_num);
for (address, objs) in &accounts_and_objs {
for obj in objs {
Expand Down

0 comments on commit a0cc1c5

Please sign in to comment.