Skip to content

Commit

Permalink
deterministically wait for fullnode to sync txns (MystenLabs#4810)
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu authored Sep 27, 2022
1 parent 0204ed3 commit ad5d79f
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 174 deletions.
101 changes: 11 additions & 90 deletions crates/sui-cluster-test/src/faucet.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use super::{
cluster::{new_wallet_context_from_cluster, Cluster},
helper::ObjectChecker,
wallet_client::WalletClient,
};
use anyhow::bail;
use super::cluster::{new_wallet_context_from_cluster, Cluster};
use async_trait::async_trait;
use clap::*;
use std::collections::HashMap;
use std::sync::Arc;
use sui_faucet::{CoinInfo, Faucet, FaucetResponse, SimpleFaucet};
use sui_faucet::{Faucet, FaucetResponse, SimpleFaucet};
use sui_types::base_types::{encode_bytes_hex, SuiAddress};
use sui_types::crypto::KeypairTraits;
use sui_types::{
base_types::{encode_bytes_hex, SuiAddress},
gas_coin::GasCoin,
object::Owner,
};
use tokio::time::{sleep, Duration};
use tracing::{debug, info, info_span, Instrument};
use uuid::Uuid;

Expand Down Expand Up @@ -52,12 +41,7 @@ impl FaucetClientFactory {
/// Faucet Client abstraction
#[async_trait]
pub trait FaucetClient {
async fn request_sui_coins(
&self,
client: &WalletClient,
minimum_coins: Option<usize>,
request_address: Option<SuiAddress>,
) -> Result<Vec<GasCoin>, anyhow::Error>;
async fn request_sui_coins(&self, request_address: SuiAddress) -> FaucetResponse;
}

/// Client for a remote faucet that is accessible by POST requests
Expand All @@ -76,16 +60,10 @@ impl RemoteFaucetClient {
impl FaucetClient for RemoteFaucetClient {
/// Request test SUI coins from faucet.
/// It also verifies the effects are observed by gateway/fullnode.
async fn request_sui_coins(
&self,
client: &WalletClient,
minimum_coins: Option<usize>,
request_address: Option<SuiAddress>,
) -> Result<Vec<GasCoin>, anyhow::Error> {
async fn request_sui_coins(&self, request_address: SuiAddress) -> FaucetResponse {
let gas_url = format!("{}/gas", self.remote_url);
debug!("Getting coin from remote faucet {}", gas_url);
let address = request_address.unwrap_or_else(|| client.get_wallet_address());
let data = HashMap::from([("recipient", encode_bytes_hex(&address))]);
let data = HashMap::from([("recipient", encode_bytes_hex(&request_address))]);
let map = HashMap::from([("FixedAmountRequest", data)]);

let response = reqwest::Client::new()
Expand All @@ -101,27 +79,9 @@ impl FaucetClient for RemoteFaucetClient {

if let Some(error) = faucet_response.error {
panic!("Failed to get gas tokens with error: {}", error)
}

sleep(Duration::from_secs(2)).await;

let gas_coins = into_gas_coin_with_owner_check(
faucet_response.transferred_gas_objects,
address,
client,
)
.await;

let minimum_coins = minimum_coins.unwrap_or(5);
};

if gas_coins.len() < minimum_coins {
bail!(
"Expect to get at least {minimum_coins} Sui Coins for address {address}, but only got {}",
gas_coins.len()
)
}

Ok(gas_coins)
faucet_response
}
}

Expand All @@ -138,52 +98,13 @@ impl LocalFaucetClient {
}
#[async_trait]
impl FaucetClient for LocalFaucetClient {
async fn request_sui_coins(
&self,
client: &WalletClient,
minimum_coins: Option<usize>,
request_address: Option<SuiAddress>,
) -> Result<Vec<GasCoin>, anyhow::Error> {
let address = request_address.unwrap_or_else(|| client.get_wallet_address());
async fn request_sui_coins(&self, request_address: SuiAddress) -> FaucetResponse {
let receipt = self
.simple_faucet
.send(Uuid::new_v4(), address, &[50000; 5])
.send(Uuid::new_v4(), request_address, &[50000; 5])
.await
.unwrap_or_else(|err| panic!("Failed to get gas tokens with error: {}", err));

sleep(Duration::from_secs(2)).await;

let gas_coins = into_gas_coin_with_owner_check(receipt.sent, address, client).await;

let minimum_coins = minimum_coins.unwrap_or(5);

if gas_coins.len() < minimum_coins {
bail!(
"Expect to get at least {minimum_coins} Sui Coins for address {address}, but only got {}. Try minting more coins on genesis.",
gas_coins.len()
)
}

Ok(gas_coins)
receipt.into()
}
}

async fn into_gas_coin_with_owner_check(
coin_info: Vec<CoinInfo>,
owner: SuiAddress,
client: &WalletClient,
) -> Vec<GasCoin> {
futures::future::join_all(
coin_info
.iter()
.map(|coin_info| {
ObjectChecker::new(coin_info.id)
.owner(Owner::AddressOwner(owner))
.check_into_gas_coin(client.get_fullnode())
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Vec<_>>()
}
27 changes: 14 additions & 13 deletions crates/sui-cluster-test/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use tracing::debug;
/// Use builder style to construct the conditions.
/// When optionals fields are not set, related checks are omitted.
/// Consuming functions such as `check` perform the check and panics if
/// verification results are unexpected. `check_into_sui_object` and
/// `check_info_gas_object` expect to get a `SuiObject` and `GasObject`
/// verification results are unexpected. `check_into_object` and
/// `check_into_gas_coin` expect to get a `SuiRawObject` and `GasCoin`
/// respectfully.
/// ```
#[derive(Debug)]
Expand Down Expand Up @@ -65,8 +65,8 @@ impl ObjectChecker {
.into_gas_coin()
}

pub async fn check_into_sui_object(self, client: &SuiClient) -> SuiRawObject {
self.check(client).await.unwrap().into_sui_object()
pub async fn check_into_object(self, client: &SuiClient) -> SuiRawObject {
self.check(client).await.unwrap().into_object()
}

pub async fn check(self, client: &SuiClient) -> Result<CheckerResultObject, anyhow::Error> {
Expand All @@ -83,7 +83,11 @@ impl ObjectChecker {

match object_info {
GetRawObjectDataResponse::NotExists(_) => {
panic!("Node can't find gas object {}", object_id)
panic!(
"Node can't find gas object {} with client {:?}",
object_id,
client.read_api()
)
}
GetRawObjectDataResponse::Deleted(_) => {
if !self.is_deleted {
Expand Down Expand Up @@ -119,21 +123,18 @@ impl ObjectChecker {

pub struct CheckerResultObject {
gas_coin: Option<GasCoin>,
sui_object: Option<SuiRawObject>,
object: Option<SuiRawObject>,
}

impl CheckerResultObject {
pub fn new(gas_coin: Option<GasCoin>, sui_object: Option<SuiRawObject>) -> Self {
Self {
gas_coin,
sui_object,
}
pub fn new(gas_coin: Option<GasCoin>, object: Option<SuiRawObject>) -> Self {
Self { gas_coin, object }
}
pub fn into_gas_coin(self) -> GasCoin {
self.gas_coin.unwrap()
}
pub fn into_sui_object(self) -> SuiRawObject {
self.sui_object.unwrap()
pub fn into_object(self) -> SuiRawObject {
self.object.unwrap()
}
}

Expand Down
103 changes: 95 additions & 8 deletions crates/sui-cluster-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use async_trait::async_trait;
use clap::*;
use cluster::{Cluster, ClusterFactory};
use config::ClusterTestOpt;
use futures::{stream::FuturesUnordered, StreamExt};
use helper::ObjectChecker;
use std::sync::Arc;
use sui::client_commands::WalletContext;
use sui_faucet::CoinInfo;
use sui_json_rpc_types::SuiTransactionResponse;
use sui_types::base_types::TransactionDigest;
use sui_types::object::Owner;
use test_utils::messages::make_transactions_with_wallet_context;

use sui_sdk::SuiClient;
Expand All @@ -21,7 +26,7 @@ use test_case::{
fullnode_execute_transaction_test::FullNodeExecuteTransactionTest,
native_transfer_test::NativeTransferTest, shared_object_test::SharedCounterTest,
};
use tokio::time::{sleep, Duration};
use tokio::time::{self, Duration};
use tracing::{error, info};
use wallet_client::WalletClient;

Expand All @@ -45,10 +50,30 @@ pub struct TestContext {

impl TestContext {
async fn get_sui_from_faucet(&self, minimum_coins: Option<usize>) -> Vec<GasCoin> {
self.faucet
.request_sui_coins(self.get_context(), minimum_coins, None)
.await
.unwrap_or_else(|e| panic!("Failed to get test SUI coins from faucet, {e}"))
let addr = self.get_wallet_address();
let faucet_response = self.faucet.request_sui_coins(addr).await;

let coin_info = faucet_response
.transferred_gas_objects
.iter()
.map(|coin_info| coin_info.transfer_tx_digest)
.collect::<Vec<_>>();
self.let_fullnode_sync(coin_info, 5).await;

let gas_coins = self
.check_owner_and_into_gas_coin(faucet_response.transferred_gas_objects, addr)
.await;

let minimum_coins = minimum_coins.unwrap_or(5);

if gas_coins.len() < minimum_coins {
panic!(
"Expect to get at least {minimum_coins} Sui Coins for address {addr}, but only got {}",
gas_coins.len()
)
}

gas_coins
}

fn get_context(&self) -> &WalletClient {
Expand Down Expand Up @@ -108,9 +133,71 @@ impl TestContext {
// TODO: figure out a more efficient way to test a local cluster
// A potential way to do this is to subscribe to txns from fullnode
// when the feature is ready
pub async fn let_fullnode_sync(&self) {
let duration = Duration::from_secs(5);
sleep(duration).await;
pub async fn let_fullnode_sync(&self, digests: Vec<TransactionDigest>, timeout_sec: u64) {
let mut futures = FuturesUnordered::new();
for digest in digests.clone() {
let task = self.get_tx_with_retry_times(digest, 1);
futures.push(Box::pin(task));
}
let mut sleep = Box::pin(time::sleep(Duration::from_secs(timeout_sec)));

loop {
tokio::select! {
_ = &mut sleep => {
panic!("Fullnode does not know all of {:?} after {} secs.", digests, timeout_sec);
}
res = futures.next() => {
match res {
Some((true, _, _)) => {},
Some((false, digest, retry_times)) => {
let task = self.get_tx_with_retry_times(digest, retry_times);
futures.push(Box::pin(task));
},
None => break, // all txns appear on fullnode, mission completed
}
}
}
}
}

async fn get_tx_with_retry_times(
&self,
digest: TransactionDigest,
retry_times: u64,
) -> (bool, TransactionDigest, u64) {
match self
.client
.get_fullnode()
.read_api()
.get_transaction(digest)
.await
{
Ok(_) => (true, digest, retry_times),
Err(_) => {
time::sleep(Duration::from_millis(300 * retry_times)).await;
(false, digest, retry_times + 1)
}
}
}

async fn check_owner_and_into_gas_coin(
&self,
coin_info: Vec<CoinInfo>,
owner: SuiAddress,
) -> Vec<GasCoin> {
futures::future::join_all(
coin_info
.iter()
.map(|coin_info| {
ObjectChecker::new(coin_info.id)
.owner(Owner::AddressOwner(owner))
.check_into_gas_coin(self.get_fullnode())
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Vec<_>>()
}
}

Expand Down
9 changes: 5 additions & 4 deletions crates/sui-cluster-test/src/test_case/call_contract_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,16 @@ impl TestCaseImpl for CallContractTest {
)).unwrap_or_else(|| panic!("Expect such a MoveEvent in events {:?}", events));

// Verify fullnode observes the txn
ctx.let_fullnode_sync().await;
ctx.let_fullnode_sync(vec![effects.transaction_digest], 5)
.await;

let sui_object = ObjectChecker::new(nft_id)
let object = ObjectChecker::new(nft_id)
.owner(Owner::AddressOwner(signer))
.check_into_sui_object(ctx.get_fullnode())
.check_into_object(ctx.get_fullnode())
.await;

assert_eq!(
sui_object.reference.version,
object.reference.version,
SequenceNumber::from_u64(1),
"Expect sequence number to be 1"
);
Expand Down
Loading

0 comments on commit ad5d79f

Please sign in to comment.