Skip to content

Commit

Permalink
[Forge] Add simple test for fast sync throughput.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 16, 2022
1 parent 579848d commit d7c3944
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Continuous E2E State Sync Performance Test For Fullnode Fast Sync

permissions:
issues: write
pull-requests: write

on:
workflow_dispatch:
schedule:
- cron: "0 */3 * * *"

jobs:
# Performance test in an optimal setting
run-forge-state-sync-perf-fullnode-fast-sync-test:
uses: ./.github/workflows/run-forge.yaml
secrets: inherit
with:
FORGE_NAMESPACE: forge-state-sync-perf-fullnode-fast-sync
# Run for 40 minutes
FORGE_RUNNER_DURATION_SECS: 2400
FORGE_TEST_SUITE: state_sync_perf_fullnodes_fast_sync
POST_TO_SLACK: true
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'t> AccountMinter<'t> {
coins_for_source
);

if req.promt_before_spending {
if req.prompt_before_spending {
if !prompt_yes(&format!(
"plan will consume in total {} balance, are you sure you want to proceed",
coins_for_source
Expand Down
8 changes: 4 additions & 4 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct EmitJobRequest {
txn_expiration_time_secs: u64,
expected_max_txns: u64,
expected_gas_per_txn: u64,
promt_before_spending: bool,
prompt_before_spending: bool,
}

impl Default for EmitJobRequest {
Expand All @@ -145,7 +145,7 @@ impl Default for EmitJobRequest {
txn_expiration_time_secs: 60,
expected_max_txns: MAX_TXNS,
expected_gas_per_txn: aptos_global_constants::MAX_GAS_AMOUNT,
promt_before_spending: false,
prompt_before_spending: false,
}
}
}
Expand Down Expand Up @@ -175,8 +175,8 @@ impl EmitJobRequest {
self
}

pub fn promt_before_spending(mut self) -> Self {
self.promt_before_spending = true;
pub fn prompt_before_spending(mut self) -> Self {
self.prompt_before_spending = true;
self
}

Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-emitter-lib/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn emit_transactions_with_cluster(
emit_job_request = emit_job_request.expected_gas_per_txn(expected_gas_per_txn);
}
if !cluster.coin_source_is_root {
emit_job_request = emit_job_request.promt_before_spending();
emit_job_request = emit_job_request.prompt_before_spending();
}
let stats = emitter
.emit_txn_for_with_stats(
Expand Down
31 changes: 29 additions & 2 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use testcases::load_vs_perf_benchmark::LoadVsPerfBenchmark;
use testcases::network_bandwidth_test::NetworkBandwidthTest;
use testcases::network_loss_test::NetworkLossTest;
use testcases::performance_with_fullnode_test::PerformanceBenchmarkWithFN;
use testcases::state_sync_performance::StateSyncValidatorPerformance;
use testcases::state_sync_performance::{
StateSyncFullnodeFastSyncPerformance, StateSyncValidatorPerformance,
};
use testcases::three_region_simulation_test::ThreeRegionSimulationTest;
use testcases::twin_validator_test::TwinValidatorTest;
use testcases::validator_join_leave_test::ValidatorJoinLeaveTest;
Expand Down Expand Up @@ -437,6 +439,7 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
"state_sync_perf_fullnodes_execute_transactions" => {
state_sync_perf_fullnodes_execute_transactions(config)
}
"state_sync_perf_fullnodes_fast_sync" => state_sync_perf_fullnodes_fast_sync(config),
"state_sync_perf_validators" => state_sync_perf_validators(config),
"validators_join_and_leave" => validators_join_and_leave(config),
"compat" => config
Expand Down Expand Up @@ -749,7 +752,6 @@ fn state_sync_perf_fullnodes_config(forge_config: ForgeConfig<'static>) -> Forge
forge_config
.with_initial_validator_count(NonZeroUsize::new(4).unwrap())
.with_initial_fullnode_count(4)
.with_network_tests(vec![&StateSyncFullnodePerformance])
}

/// The config for running a state sync performance test when applying
Expand All @@ -758,6 +760,7 @@ fn state_sync_perf_fullnodes_apply_outputs(
forge_config: ForgeConfig<'static>,
) -> ForgeConfig<'static> {
state_sync_perf_fullnodes_config(forge_config)
.with_network_tests(vec![&StateSyncFullnodePerformance])
.with_genesis_helm_config_fn(Arc::new(|helm_values| {
helm_values["chain"]["epoch_duration_secs"] = 600.into();
}))
Expand All @@ -776,6 +779,7 @@ fn state_sync_perf_fullnodes_execute_transactions(
forge_config: ForgeConfig<'static>,
) -> ForgeConfig<'static> {
state_sync_perf_fullnodes_config(forge_config)
.with_network_tests(vec![&StateSyncFullnodePerformance])
.with_genesis_helm_config_fn(Arc::new(|helm_values| {
helm_values["chain"]["epoch_duration_secs"] = 600.into();
}))
Expand All @@ -788,6 +792,29 @@ fn state_sync_perf_fullnodes_execute_transactions(
.with_success_criteria(SuccessCriteria::new(5000, 10000, false, None, None, None))
}

/// The config for running a state sync performance test when fast syncing
/// to the latest epoch.
fn state_sync_perf_fullnodes_fast_sync(forge_config: ForgeConfig<'static>) -> ForgeConfig<'static> {
state_sync_perf_fullnodes_config(forge_config)
.with_network_tests(vec![&StateSyncFullnodeFastSyncPerformance])
.with_genesis_helm_config_fn(Arc::new(|helm_values| {
helm_values["chain"]["epoch_duration_secs"] = 180.into(); // Frequent epochs
}))
.with_emit_job(
EmitJobRequest::default()
.mode(EmitJobMode::MaxLoad {
mempool_backlog: 30000,
})
.transaction_type(TransactionType::AccountGeneration), // Create many state values
)
.with_node_helm_config_fn(Arc::new(|helm_values| {
helm_values["fullnode"]["config"]["state_sync"]["state_sync_driver"]
["bootstrapping_mode"] = "DownloadLatestStates".into();
helm_values["fullnode"]["config"]["state_sync"]["state_sync_driver"]
["continuous_syncing_mode"] = "ApplyTransactionOutputs".into();
}))
}

/// The config for running a state sync performance test when applying
/// transaction outputs in failed validators.
fn state_sync_perf_validators(forge_config: ForgeConfig<'static>) -> ForgeConfig<'static> {
Expand Down
169 changes: 138 additions & 31 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,29 @@ pub trait SwarmExt: Swarm {
Ok(())
}

/// Waits for all nodes to have caught up to the specified `verison`.
/// Waits for all nodes to have caught up to the specified `target_version`.
async fn wait_for_all_nodes_to_catchup_to_version(
&self,
version: u64,
target_version: u64,
timeout: Duration,
) -> Result<()> {
wait_for_all_nodes_to_catchup_to_version(
&self.get_all_nodes_clients_with_names(),
version,
target_version,
timeout,
)
.await
}

/// Waits for all nodes to have caught up to the specified `target_epoch`.
async fn wait_for_all_nodes_to_catchup_to_epoch(
&self,
target_epoch: u64,
timeout: Duration,
) -> Result<()> {
wait_for_all_nodes_to_catchup_to_epoch(
&self.get_all_nodes_clients_with_names(),
target_epoch,
timeout,
)
.await
Expand All @@ -259,6 +273,19 @@ pub trait SwarmExt: Swarm {
wait_for_all_nodes_to_catchup(&self.get_all_nodes_clients_with_names(), timeout).await
}

/// Wait for all nodes in the network to change epochs. This is done by first querying each node
/// for its current epoch, selecting the max epoch, then waiting for all nodes to sync to max
/// epoch + 1.
async fn wait_for_all_nodes_to_change_epoch(&self, timeout: Duration) -> Result<()> {
let clients = &self.get_all_nodes_clients_with_names();
if clients.is_empty() {
bail!("No nodes are available!")
}

let highest_synced_epoch = get_highest_synced_epoch(clients).await?;
wait_for_all_nodes_to_catchup_to_epoch(clients, highest_synced_epoch + 1, timeout).await
}

async fn wait_for_all_nodes_to_catchup_to_next(&self, timeout: Duration) -> Result<()> {
let clients = self.get_all_nodes_clients_with_names();
let highest_synced_version = get_highest_synced_version(&clients).await?;
Expand Down Expand Up @@ -323,42 +350,107 @@ pub trait SwarmExt: Swarm {
}
}

/// Waits for all nodes to have caught up to the specified `verison`.
/// Waits for all nodes to have caught up to the specified `target_version`.
pub async fn wait_for_all_nodes_to_catchup_to_version(
clients: &[(String, RestClient)],
version: u64,
target_version: u64,
timeout: Duration,
) -> Result<()> {
wait_for_all_nodes_to_catchup_to_target_version_or_epoch(
clients,
Some(target_version),
None,
timeout,
)
.await
}

/// Waits for all nodes to have caught up to the specified `target_epoch`.
pub async fn wait_for_all_nodes_to_catchup_to_epoch(
clients: &[(String, RestClient)],
target_epoch: u64,
timeout: Duration,
) -> Result<()> {
wait_for_all_nodes_to_catchup_to_target_version_or_epoch(
clients,
None,
Some(target_epoch),
timeout,
)
.await
}

/// Waits for all nodes to have caught up to the specified `target_version` or `target_epoch`.
async fn wait_for_all_nodes_to_catchup_to_target_version_or_epoch(
clients: &[(String, RestClient)],
target_version: Option<u64>,
target_epoch: Option<u64>,
timeout: Duration,
) -> Result<()> {
if target_version.is_none() && target_epoch.is_none() {
bail!("No target version or epoch was specified!")
}

let start_time = Instant::now();
loop {
let results: Result<Vec<_>> = try_join_all(clients.iter().map(|(name, node)| async move {
Ok((
name,
node.get_ledger_information().await?.into_inner().version,
))
}))
.await;
let versions = results
.map(|resps| resps.into_iter().collect::<Vec<_>>())
// Fetch the current versions and epochs of all nodes
let version_and_epoch_results: Result<Vec<_>> =
try_join_all(clients.iter().map(|(node_name, node)| async move {
let node_ledger_info_response = node.get_ledger_information().await?.into_inner();
Ok((
node_name,
node_ledger_info_response.version,
node_ledger_info_response.epoch,
))
}))
.await;
let node_versions_and_epochs = version_and_epoch_results
.map(|results| results.into_iter().collect::<Vec<_>>())
.ok();
let all_caught_up = versions
.clone()
.map(|resps| resps.iter().all(|(_, v)| *v >= version))
.unwrap_or(false);
if all_caught_up {

// Check if all nodes are caught up to the target version
let all_caught_up_to_version = target_version
.map(|target_version| {
node_versions_and_epochs
.clone()
.map(|responses| {
responses
.iter()
.all(|(_, version, _)| *version >= target_version)
})
.unwrap_or(false) // No version found
})
.unwrap_or(true); // No target version was specified

// Check if all nodes are caught up to the target epoch
let all_caught_up_to_epoch = target_epoch
.map(|target_epoch| {
node_versions_and_epochs
.clone()
.map(|responses| responses.iter().all(|(_, _, epoch)| *epoch >= target_epoch))
.unwrap_or(false) // No epoch found
})
.unwrap_or(true); // No target epoch was specified

// Check if all targets have been met
if all_caught_up_to_version && all_caught_up_to_epoch {
info!(
"All nodes caught up successfully in {}s",
"All nodes caught up to target version and epoch ({:?}, {:?}) successfully, in {} seconds",
target_version,
target_epoch,
start_time.elapsed().as_secs()
);
return Ok(());
}

// Check if we've timed out while waiting
if start_time.elapsed() > timeout {
return Err(anyhow!(
"Waiting for nodes to catch up to version {} timed out after {}s, current status: {:?}",
version,
"Waiting for nodes to catch up to target version and epoch ({:?}, {:?}) timed out after {} seconds, current status: {:?}",
target_version,
target_epoch,
start_time.elapsed().as_secs(),
versions.unwrap_or_default()
node_versions_and_epochs.unwrap_or_default()
));
}

Expand All @@ -375,22 +467,37 @@ pub async fn wait_for_all_nodes_to_catchup(
timeout: Duration,
) -> Result<()> {
if clients.is_empty() {
bail!("no nodes available")
bail!("No nodes are available!")
}
let highest_synced_version = get_highest_synced_version(clients).await?;
wait_for_all_nodes_to_catchup_to_version(clients, highest_synced_version, timeout).await
}

/// Returns the highest synced version of the given clients
pub async fn get_highest_synced_version(clients: &[(String, RestClient)]) -> Result<u64> {
let mut latest_version = 0u64;
for (_, c) in clients {
latest_version = latest_version.max(
c.get_ledger_information()
let (highest_synced_version, _) = get_highest_synced_version_and_epoch(clients).await?;
Ok(highest_synced_version)
}

/// Returns the highest synced epoch of the given clients
pub async fn get_highest_synced_epoch(clients: &[(String, RestClient)]) -> Result<u64> {
let (_, highest_synced_epoch) = get_highest_synced_version_and_epoch(clients).await?;
Ok(highest_synced_epoch)
}

/// Returns the highest synced version and epoch of the given clients
async fn get_highest_synced_version_and_epoch(
clients: &[(String, RestClient)],
) -> Result<(u64, u64)> {
let mut latest_version_and_epoch = (0, 0);
for (_, client) in clients {
latest_version_and_epoch = latest_version_and_epoch.max(
client
.get_ledger_information()
.await
.map(|r| r.into_inner().version)
.unwrap_or(0),
.map(|r| (r.inner().version, r.inner().epoch))
.unwrap_or((0, 0)),
);
}
Ok(latest_version)
Ok(latest_version_and_epoch)
}
Loading

0 comments on commit d7c3944

Please sign in to comment.