From 9e14a63466a761e742c9118fa8e406f843b77b60 Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Fri, 30 Sep 2022 11:49:17 -0700 Subject: [PATCH] [test on forge] Add state sync catching up stress tests (#4036) --- ...-state-sync-failures-catching-up-test.yaml | 23 +++ ...sync-slow-processing-catching-up-test.yaml | 23 +++ testsuite/forge-cli/src/main.rs | 44 +++-- testsuite/forge/src/backend/k8s/swarm.rs | 17 +- testsuite/forge/src/interface/swarm.rs | 30 +++- testsuite/forge/src/success_criteria.rs | 38 ++--- .../forge/src/test_utils/consensus_utils.rs | 21 +-- .../src/consensus_reliability_tests.rs | 161 +++++++++++++----- testsuite/testcases/src/lib.rs | 55 +++--- 9 files changed, 271 insertions(+), 141 deletions(-) create mode 100644 .github/workflows/continuous-e2e-state-sync-failures-catching-up-test.yaml create mode 100644 .github/workflows/continuous-e2e-state-sync-slow-processing-catching-up-test.yaml diff --git a/.github/workflows/continuous-e2e-state-sync-failures-catching-up-test.yaml b/.github/workflows/continuous-e2e-state-sync-failures-catching-up-test.yaml new file mode 100644 index 0000000000000..47f3152ee5b00 --- /dev/null +++ b/.github/workflows/continuous-e2e-state-sync-failures-catching-up-test.yaml @@ -0,0 +1,23 @@ +name: Continuous E2E State Sync Failures Catching Up Test + +permissions: + issues: write + pull-requests: write + +on: + workflow_dispatch: + schedule: + - cron: "0 */3 * * *" + +jobs: + ### Please remember to use different namespace for different tests + run-state-sync-failures-catching-up-test: + uses: ./.github/workflows/run-forge.yaml + secrets: inherit + with: + FORGE_NAMESPACE: forge-state-sync-failures-catching-up-test + FORGE_CLUSTER_NAME: aptos-forge-big-1 + FORGE_RUNNER_DURATION_SECS: 900 + FORGE_TEST_SUITE: failures_catching_up + POST_TO_SLACK: true + FORGE_ENABLE_FAILPOINTS: true diff --git a/.github/workflows/continuous-e2e-state-sync-slow-processing-catching-up-test.yaml b/.github/workflows/continuous-e2e-state-sync-slow-processing-catching-up-test.yaml new file mode 100644 index 0000000000000..0a070ff49ace2 --- /dev/null +++ b/.github/workflows/continuous-e2e-state-sync-slow-processing-catching-up-test.yaml @@ -0,0 +1,23 @@ +name: Continuous E2E State Sync Slow Processing Catching Up Test + +permissions: + issues: write + pull-requests: write + +on: + workflow_dispatch: + schedule: + - cron: "0 */3 * * *" + +jobs: + ### Please remember to use different namespace for different tests + run-state-sync-slow-processing-catching-up-test: + uses: ./.github/workflows/run-forge.yaml + secrets: inherit + with: + FORGE_NAMESPACE: forge-state-sync-slow-processing-catching-up-test + FORGE_CLUSTER_NAME: aptos-forge-big-1 + FORGE_RUNNER_DURATION_SECS: 900 + FORGE_TEST_SUITE: slow_processing_catching_up + POST_TO_SLACK: true + FORGE_ENABLE_FAILPOINTS: true diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index cc51c448669da..a737fffe9f79f 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -615,7 +615,6 @@ fn single_test_suite(test_name: &str) -> Result> { // to test different timings) check_period_s: 27, }, - false, ), "changing_working_quorum_test" => changing_working_quorum_test( 20, @@ -634,7 +633,6 @@ fn single_test_suite(test_name: &str) -> Result> { // to test different timings) check_period_s: 27, }, - false, ), "changing_working_quorum_test_high_load" => changing_working_quorum_test( 20, @@ -653,9 +651,6 @@ fn single_test_suite(test_name: &str) -> Result> { // to test different timings) check_period_s: 27, }, - // Max load cannot be sustained without gaps in progress. - // Using high load instead. - false, ), // not scheduled on continuous "large_test_only_few_nodes_down" => changing_working_quorum_test( @@ -671,7 +666,6 @@ fn single_test_suite(test_name: &str) -> Result> { add_execution_delay: false, check_period_s: 27, }, - false, ), "different_node_speed_and_reliability_test" => changing_working_quorum_test( 20, @@ -686,7 +680,34 @@ fn single_test_suite(test_name: &str) -> Result> { add_execution_delay: true, check_period_s: 27, }, - false, + ), + "slow_processing_catching_up" => changing_working_quorum_test( + 10, + 300, + 3000, + 2500, + &ChangingWorkingQuorumTest { + min_tps: 1500, + always_healthy_nodes: 2, + max_down_nodes: 0, + num_large_validators: 2, + add_execution_delay: true, + check_period_s: 57, + }, + ), + "failures_catching_up" => changing_working_quorum_test( + 10, + 300, + 3000, + 2500, + &ChangingWorkingQuorumTest { + min_tps: 1500, + always_healthy_nodes: 2, + max_down_nodes: 1, + num_large_validators: 2, + add_execution_delay: false, + check_period_s: 27, + }, ), "twin_validator_test" => config .with_network_tests(vec![&TwinValidatorTest]) @@ -879,7 +900,6 @@ fn changing_working_quorum_test( target_tps: usize, min_avg_tps: usize, test: &'static ChangingWorkingQuorumTest, - max_load: bool, ) -> ForgeConfig<'static> { let config = ForgeConfig::default(); let num_large_validators = test.num_large_validators; @@ -910,13 +930,7 @@ fn changing_working_quorum_test( })) .with_emit_job( EmitJobRequest::default() - .mode(if max_load { - EmitJobMode::MaxLoad { - mempool_backlog: 20000, - } - } else { - EmitJobMode::ConstTps { tps: target_tps } - }) + .mode(EmitJobMode::ConstTps { tps: target_tps }) .transaction_mix(vec![ (TransactionType::P2P, 80), (TransactionType::AccountGeneration, 20), diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 2cc3883c62c5c..e7584967c52a1 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -287,31 +287,27 @@ impl Swarm for K8sSwarm { async fn ensure_no_validator_restart(&self) -> Result<()> { for validator in &self.validators { - if let Err(e) = check_for_container_restart( + check_for_container_restart( &self.kube_client, &self.kube_namespace.clone(), validator.1.stateful_set_name(), ) - .await - { - return Err(e); - } + .await?; } + info!("Found no validator restarts"); Ok(()) } async fn ensure_no_fullnode_restart(&self) -> Result<()> { for fullnode in &self.fullnodes { - if let Err(e) = check_for_container_restart( + check_for_container_restart( &self.kube_client, &self.kube_namespace.clone(), fullnode.1.stateful_set_name(), ) - .await - { - return Err(e); - } + .await?; } + info!("Found no fullnode restarts"); Ok(()) } @@ -345,6 +341,7 @@ impl Swarm for K8sSwarm { ) .await?; threshold.ensure_threshold(&system_metrics)?; + info!("System metrics are healthy"); Ok(()) } else { bail!("No prom client"); diff --git a/testsuite/forge/src/interface/swarm.rs b/testsuite/forge/src/interface/swarm.rs index a95febb60eeec..79bd66282ce54 100644 --- a/testsuite/forge/src/interface/swarm.rs +++ b/testsuite/forge/src/interface/swarm.rs @@ -10,7 +10,7 @@ use aptos_config::config::NodeConfig; use aptos_logger::info; use aptos_rest_client::Client as RestClient; use aptos_sdk::types::PeerId; -use futures::future::try_join_all; +use futures::future::{join_all, try_join_all}; use prometheus_http_query::response::PromqlResult; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; @@ -296,6 +296,31 @@ pub trait SwarmExt: Swarm { }) .collect() } + + async fn get_client_with_newest_ledger_version(&self) -> Option<(u64, RestClient)> { + let clients = self.get_all_nodes_clients_with_names(); + let ledger_infos = join_all(clients.iter().map(|(_name, client)| async { + let start = Instant::now(); + let result = client.get_ledger_information().await; + + info!( + "Fetch from {:?} took {}ms, at version: {}", + client.path_prefix_string(), + start.elapsed().as_millis(), + result + .as_ref() + .map(|r| r.inner().version as i64) + .unwrap_or(-1) + ); + result + })) + .await; + ledger_infos + .into_iter() + .zip(clients.into_iter()) + .flat_map(|(resp, (_, client))| resp.map(|r| (r.into_inner().version, client))) + .max_by_key(|(v, _c)| *v) + } } /// Waits for all nodes to have caught up to the specified `verison`. @@ -330,8 +355,9 @@ pub async fn wait_for_all_nodes_to_catchup_to_version( if start_time.elapsed() > timeout { return Err(anyhow!( - "Waiting for nodes to catch up to version {} timed out, current status: {:?}", + "Waiting for nodes to catch up to version {} timed out after {}s, current status: {:?}", version, + start_time.elapsed().as_secs(), versions.unwrap_or_default() )); } diff --git a/testsuite/forge/src/success_criteria.rs b/testsuite/forge/src/success_criteria.rs index 978e66e34b290..40d3365af2429 100644 --- a/testsuite/forge/src/success_criteria.rs +++ b/testsuite/forge/src/success_criteria.rs @@ -1,10 +1,9 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use anyhow::bail; +use anyhow::{bail, Context}; use aptos::node::analyze::fetch_metadata::FetchMetadata; use aptos_sdk::types::PeerId; -use futures::future::join_all; use serde::Serialize; use std::time::Duration; use transaction_emitter_lib::emitter::stats::TxnStats; @@ -69,12 +68,21 @@ impl SuccessCriteria { } if let Some(timeout) = self.wait_for_all_nodes_to_catchup { - swarm.wait_for_all_nodes_to_catchup_to_next(timeout).await?; + swarm + .wait_for_all_nodes_to_catchup_to_next(timeout) + .await + .context("Failed waiting for all nodes to catchup to next version")?; } if self.check_no_restarts { - swarm.ensure_no_validator_restart().await?; - swarm.ensure_no_fullnode_restart().await?; + swarm + .ensure_no_validator_restart() + .await + .context("Failed ensuring no validator restarted")?; + swarm + .ensure_no_fullnode_restart() + .await + .context("Failed ensuring no fullnode restarted")?; } // TODO(skedia) Add latency success criteria after we have support for querying prometheus @@ -92,7 +100,8 @@ impl SuccessCriteria { if let Some(chain_progress_threshold) = &self.chain_progress_check { self.check_chain_progress(swarm, chain_progress_threshold, start_version, end_version) - .await?; + .await + .context("Failed check chain progress")?; } Ok(()) @@ -106,19 +115,10 @@ impl SuccessCriteria { end_version: u64, ) -> anyhow::Result<()> { // Choose client with newest ledger version to fetch NewBlockEvents from: - let clients = swarm.get_all_nodes_clients_with_names(); - let ledger_infos = join_all( - clients - .iter() - .map(|(_name, client)| client.get_ledger_information()), - ) - .await; - let (_max_v, client) = ledger_infos - .into_iter() - .zip(clients.into_iter()) - .flat_map(|(resp, (_, client))| resp.map(|r| (r.into_inner().version, client))) - .max_by_key(|(v, _c)| *v) - .unwrap(); + let (_max_v, client) = swarm + .get_client_with_newest_ledger_version() + .await + .context("No clients replied in check_chain_progress")?; let epochs = FetchMetadata::fetch_new_block_events(&client, None, None) .await diff --git a/testsuite/forge/src/test_utils/consensus_utils.rs b/testsuite/forge/src/test_utils/consensus_utils.rs index 6f87b70273ccc..d60a8a3836a02 100644 --- a/testsuite/forge/src/test_utils/consensus_utils.rs +++ b/testsuite/forge/src/test_utils/consensus_utils.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use aptos_rest_client::Client as RestClient; use async_trait::async_trait; use chrono::Utc; @@ -159,32 +159,25 @@ pub async fn test_consensus_fault_tolerance( let target_v = largest_v + 10; wait_for_all_nodes_to_catchup_to_version(&validator_clients, target_v, Duration::from_secs(30)) - .await?; + .await + .context("catchup failed")?; let transactions: Vec<_> = join_all(validator_clients.iter().cloned().map(move |v| async move { let mut txns = v.1.get_transactions_bcs(Some(target_v.saturating_sub(1000)), Some(1000)) .await - .map_err(|e| anyhow!("{:?}", e))? + .unwrap() .into_inner(); txns.retain(|t| t.version <= target_v); - >>::Ok(txns) + >>::Ok(txns) })) .await; - let txns_a = transactions - .first() - .unwrap() - .as_ref() - .map_err(|e| anyhow!("{:?}", &e))?; + let txns_a = transactions.first().unwrap().as_ref().unwrap(); for i in 1..transactions.len() { - let txns_b = transactions - .get(i) - .unwrap() - .as_ref() - .map_err(|e| anyhow!("{:?}", &e))?; + let txns_b = transactions.get(i).unwrap().as_ref().unwrap(); assert_eq!( txns_a.len(), txns_b.len(), diff --git a/testsuite/testcases/src/consensus_reliability_tests.rs b/testsuite/testcases/src/consensus_reliability_tests.rs index e30d833f9cace..cbda8611266ad 100644 --- a/testsuite/testcases/src/consensus_reliability_tests.rs +++ b/testsuite/testcases/src/consensus_reliability_tests.rs @@ -2,10 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{LoadDestination, NetworkLoadTest}; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use aptos_logger::{info, warn}; -use aptos_rest_client::error::RestError; -use aptos_sdk::types::account_config::CORE_CODE_ADDRESS; use forge::test_utils::consensus_utils::{ test_consensus_fault_tolerance, FailPointFailureInjection, NodeState, }; @@ -56,15 +54,6 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest { let num_validators = validators.len(); - let validator_set: serde_json::Value = runtime - .block_on( - validators[0] - .1 - .get_resource(CORE_CODE_ADDRESS, "0x1::stake::ValidatorSet"), - )? - .into_inner(); - info!("ValidatorSet : {:?}", validator_set); - let num_always_healthy = self.always_healthy_nodes; // largest number of (small) nodes that can fail simultaneously, while we have enough for quorum let can_fail_for_quorum = @@ -89,23 +78,38 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest { "Always healthy {} nodes, every cycle having {} nodes out of {} down, rotating {} each cycle, expecting first {} validators to have 10x larger stake", num_always_healthy, max_fail_in_test, num_validators, cycle_offset, self.num_large_validators); - if self.add_execution_delay { + let slow_allowed_lagging = if self.add_execution_delay { runtime.block_on(async { let mut rng = rand::thread_rng(); - for (name, validator) in &validators[num_always_healthy..num_validators] { + let mut slow_allowed_lagging = HashSet::new(); + for (index, (name, validator)) in + validators.iter().enumerate().skip(num_always_healthy) + { let sleep_time = rng.gen_range(20, 500); + if sleep_time > 100 { + slow_allowed_lagging.insert(index); + } let name = name.clone(); + validator .set_failpoint( "aptos_vm::execution::block_metadata".to_string(), format!("sleep({})", sleep_time), ) .await - .with_context(|| name)?; + .map_err(|e| { + anyhow!( + "set_failpoint to remove execution delay on {} failed, {:?}", + name, + e + ) + })?; } - Ok::<(), RestError>(()) - })?; - } + Ok::, anyhow::Error>(slow_allowed_lagging) + })? + } else { + HashSet::new() + }; let min_tps = self.min_tps; let check_period_s = self.check_period_s; @@ -136,56 +140,96 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest { (vec![], false) } }))), - Box::new(move |cycle, _, _, _, cur, previous| { + Box::new(move |cycle, _, _, _, cycle_end, cycle_start| { + // we group nodes into 3 groups: + // - active - nodes we expect to be making progress, and doing so together. we check wery strict rule of min(cycle_end) vs max(cycle_start) + // - allowed_lagging - nodes that are allowed to not be up-to-date to the tip of the chain, but are required to be making individual progress. + // We treat all nodes that were recently down as those (while state-sync is given time to catch-up), or nodes that + // were added slowness into execution via add_execution_delay param. + // - down - nodes that are cut-off from the rest of the nodes, and so shouldn't be seeing any progress. There should be no progress + // on the ordered certificates, but since we are only seeing committed ones, we allow for only minimal progress there, for + // what they already have in the buffer. + let down_indices = down_indices_f(cycle); - let prev_down_indices = if cycle > 0 { down_indices_f(cycle - 1) } else { HashSet::new() }; - fn split(all: Vec, down_indices: &HashSet) -> (Vec<(usize, NodeState)>, Vec) { - let (down, active): (Vec<_>, Vec<_>) = all.into_iter().enumerate().partition(|(idx, _state)| down_indices.contains(idx)); - (down, active.into_iter().map(|(_idx, state)| state).collect()) + let recently_down_indices = if cycle > 0 { down_indices_f(cycle - 1) } else { HashSet::new() }; + fn split(all: Vec, down_indices: &HashSet, allowed_lagging_indices: &HashSet) -> (Vec<(usize, NodeState)>, Vec<(usize, NodeState)>, Vec) { + let (down, not_down): (Vec<_>, Vec<_>) = all.into_iter().enumerate().partition(|(idx, _state)| down_indices.contains(idx)); + let (allowed_lagging, active) = not_down.into_iter().partition(|(idx, _state)| allowed_lagging_indices.contains(idx)); + (down, allowed_lagging, active.into_iter().map(|(_idx, state)| state).collect()) } - let (cur_down, cur_active) = split(cur, &down_indices); - let (prev_down, prev_active) = split(previous, &down_indices); + let allowed_lagging = recently_down_indices.union(&slow_allowed_lagging).cloned().collect::>(); + let (cycle_end_down, cycle_end_allowed_lagging, cycle_end_active) = split(cycle_end, &down_indices, &allowed_lagging); + let (cycle_start_down, cycle_start_allowed_lagging, cycle_start_active) = split(cycle_start, &down_indices, &allowed_lagging); - // Make sure that every active node is making progress, so we compare min(cur) vs max(previous) - let (cur_min_epoch, cur_min_round) = cur_active.iter().map(|s| (s.epoch, s.round)).min().unwrap(); - let (prev_max_epoch, prev_max_round) = prev_active.iter().map(|s| (s.epoch, s.round)).max().unwrap(); + // Make sure that every active node is making progress, so we compare min(cycle_end) vs max(cycle_start) + let (cycle_end_min_epoch, cycle_end_min_round) = cycle_end_active.iter().map(|s| (s.epoch, s.round)).min().unwrap(); + let (cycle_start_max_epoch, cycle_start_max_round) = cycle_start_active.iter().map(|s| (s.epoch, s.round)).max().unwrap(); - let epochs_progress = cur_min_epoch as i64 - prev_max_epoch as i64; - let round_progress = cur_min_round as i64 - prev_max_round as i64; + let epochs_progress = cycle_end_min_epoch as i64 - cycle_start_max_epoch as i64; + let round_progress = cycle_end_min_round as i64 - cycle_start_max_round as i64; - let transaction_progress = cur_active.iter().map(|s| s.version).min().unwrap() as i64 - - prev_active.iter().map(|s| s.version).max().unwrap() as i64; + let transaction_progress = cycle_end_active.iter().map(|s| s.version).min().unwrap() as i64 + - cycle_start_active.iter().map(|s| s.version).max().unwrap() as i64; if transaction_progress < (min_tps * check_period_s) as i64 { bail!( - "no progress with active consensus, only {} transactions, expected >= {} ({} TPS). Down indices {:?}, Prev active: {:?}. Cur active: {:?}", + "not enough progress with active consensus, only {} transactions, expected >= {} ({} TPS). Down indices {:?}, cycle start active: {:?}. cycle end active: {:?}", transaction_progress, - (min_tps * check_period_s), + min_tps * check_period_s, min_tps, down_indices, - prev_active, - cur_active, + cycle_start_active, + cycle_end_active, ); } if epochs_progress < 0 || (epochs_progress == 0 && round_progress < (check_period_s / 2) as i64) { - bail!("no progress with active consensus, only {} epochs and {} rounds, expectd >= {}", - epochs_progress, - round_progress, - (check_period_s / 2), + bail!( + "not enough progress with active consensus, only {} epochs and {} rounds, expectd >= {}", + epochs_progress, + round_progress, + check_period_s / 2, ); } + // Make sure that allowed_lagging nodes are making progress + for ((node_idx, cycle_end_state), (node_idx_p, cycle_start_state)) in cycle_end_allowed_lagging.iter().zip(cycle_start_allowed_lagging.iter()) { + assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cycle_end_allowed_lagging, cycle_start_allowed_lagging); + let transaction_progress = cycle_end_state.version as i64 - cycle_start_state.version as i64; + if transaction_progress < (min_tps * check_period_s) as i64 { + bail!( + "not enough individual progress on allowed lagging node ({}), only {} transactions, expected >= {} ({} TPS)", + node_idx, + transaction_progress, + min_tps * check_period_s, + min_tps, + ); + } + + let epochs_progress = cycle_end_state.epoch as i64 - cycle_start_state.epoch as i64; + let round_progress = cycle_end_state.round as i64 - cycle_start_state.round as i64; + if epochs_progress < 0 || (epochs_progress == 0 && round_progress < (check_period_s / 2) as i64) { + bail!( + "not enough individual progress on allowed lagging node ({}), only {} epochs and {} rounds, expectd >= {}. Transaction progress was {}.", + node_idx, + epochs_progress, + round_progress, + check_period_s / 2, + transaction_progress, + ); + } + } + // Make sure down nodes don't make progress: - for ((node_idx, cur_state), (node_idx_p, prev_state)) in cur_down.iter().zip(prev_down.iter()) { - assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cur_down, prev_down); - if cur_state.round > prev_state.round + 3 { + for ((node_idx, cycle_end_state), (node_idx_p, cycle_start_state)) in cycle_end_down.iter().zip(cycle_start_down.iter()) { + assert_eq!(node_idx, node_idx_p, "{:?} {:?}", cycle_end_down, cycle_start_down); + if cycle_end_state.round > cycle_start_state.round + 3 { // if we just failed the node, some progress can happen due to pipeline in consensus, // or buffer of received messages in state sync - if prev_down_indices.contains(node_idx) { - bail!("progress on down node {} from ({}, {}) to ({}, {})", node_idx, prev_state.epoch, prev_state.round, cur_state.epoch, cur_state.round); + if recently_down_indices.contains(node_idx) { + bail!("progress on down node {} from ({}, {}) to ({}, {})", node_idx, cycle_start_state.epoch, cycle_start_state.round, cycle_end_state.epoch, cycle_end_state.round); } else { - warn!("progress on down node {} immediatelly after turning off from ({}, {}) to ({}, {})", node_idx, prev_state.epoch, prev_state.round, cur_state.epoch, cur_state.round) + warn!("progress on down node {} immediatelly after turning off from ({}, {}) to ({}, {})", node_idx, cycle_start_state.epoch, cycle_start_state.round, cycle_end_state.epoch, cycle_end_state.round) } } } @@ -194,8 +238,31 @@ impl NetworkLoadTest for ChangingWorkingQuorumTest { }), false, true, - ))?; + )).context("test_consensus_fault_tolerance failed")?; + // undo slowing down. + if self.add_execution_delay { + runtime.block_on(async { + for (name, validator) in validators.iter().skip(num_always_healthy) { + let name = name.clone(); + + validator + .set_failpoint( + "aptos_vm::execution::block_metadata".to_string(), + "off".to_string(), + ) + .await + .map_err(|e| { + anyhow!( + "set_failpoint to remove execution delay on {} failed, {:?}", + name, + e + ) + })?; + } + Ok::<(), anyhow::Error>(()) + })?; + } Ok(()) } } diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index eae8cae668f7e..7c2ec668a3dee 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -20,7 +20,7 @@ pub mod twin_validator_test; pub mod validator_join_leave_test; pub mod validator_reboot_stress_test; -use anyhow::{anyhow, ensure}; +use anyhow::{anyhow, ensure, Context}; use aptos_logger::info; use aptos_sdk::{transaction_builder::TransactionFactory, types::PeerId}; use forge::{ @@ -31,6 +31,7 @@ use futures::future::join_all; use rand::{rngs::StdRng, SeedableRng}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::runtime::Builder; +use tokio::runtime::Runtime; const WARMUP_DURATION_FRACTION: f32 = 0.07; const COOLDOWN_DURATION_FRACTION: f32 = 0.04; @@ -132,16 +133,14 @@ pub trait NetworkLoadTest: Test { impl NetworkTest for dyn NetworkLoadTest { fn run<'t>(&self, ctx: &mut NetworkContext<'t>) -> Result<()> { + let runtime = Runtime::new().unwrap(); let start_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards") .as_secs(); - let one_client = ctx.swarm().aptos_public_info().client().clone(); - let start_version = ctx - .runtime - .block_on(one_client.get_ledger_information())? - .into_inner() - .version; + let (start_version, _) = runtime + .block_on(ctx.swarm().get_client_with_newest_ledger_version()) + .context("no clients replied for start version")?; let emit_job_request = ctx.emit_job.clone(); let rng = SeedableRng::from_rng(ctx.core().rng())?; let duration = ctx.global_duration; @@ -160,13 +159,12 @@ impl NetworkTest for dyn NetworkLoadTest { .duration_since(UNIX_EPOCH) .expect("Time went backwards") .as_secs(); - let end_version = ctx - .runtime - .block_on(one_client.get_ledger_information())? - .into_inner() - .version; + let (end_version, _) = runtime + .block_on(ctx.swarm().get_client_with_newest_ledger_version()) + .context("no clients replied for end version")?; - self.finish(ctx.swarm())?; + self.finish(ctx.swarm()) + .context("finish NetworkLoadTest ")?; ctx.check_for_success( &txn_stat, @@ -175,7 +173,8 @@ impl NetworkTest for dyn NetworkLoadTest { end_timestamp as i64, start_version, end_version, - )?; + ) + .context("check for success")?; Ok(()) } @@ -203,7 +202,7 @@ impl dyn NetworkLoadTest { .map(|v| v.peer_id()) .collect::>(); - let nodes_to_send_load_to = match self.setup(ctx)? { + let nodes_to_send_load_to = match self.setup(ctx).context("setup NetworkLoadTest")? { LoadDestination::AllNodes => [&all_validators[..], &all_fullnodes[..]].concat(), LoadDestination::AllValidators => all_validators, LoadDestination::AllFullnodes => all_fullnodes, @@ -218,7 +217,8 @@ impl dyn NetworkLoadTest { &nodes_to_send_load_to, aptos_global_constants::GAS_UNIT_PRICE, rng, - )?; + ) + .context("create emitter")?; let mut runtime_builder = Builder::new_multi_thread(); runtime_builder.disable_lifo_slot().enable_all(); @@ -231,23 +231,9 @@ impl dyn NetworkLoadTest { .swarm() .get_clients_for_peers(&nodes_to_send_load_to, Duration::from_secs(10)); - // Read first - for client in &clients { - let start = Instant::now(); - let _v = rt.block_on(client.get_ledger_information())?; - let duration = start.elapsed(); - info!( - "Fetch from {:?} took {}ms", - client.path_prefix_string(), - duration.as_millis(), - ); - } - - let job = rt.block_on(emitter.start_job( - ctx.swarm().chain_info().root_account, - emit_job_request, - 3, - ))?; + let job = rt + .block_on(emitter.start_job(ctx.swarm().chain_info().root_account, emit_job_request, 3)) + .context("start emitter job")?; let warmup_duration = duration.mul_f32(warmup_duration_fraction); let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); @@ -270,7 +256,8 @@ impl dyn NetworkLoadTest { job.start_next_phase(); let test_start = Instant::now(); - self.test(ctx.swarm(), test_duration)?; + self.test(ctx.swarm(), test_duration) + .context("test NetworkLoadTest")?; let actual_test_duration = test_start.elapsed(); info!( "{}s test finished after {}s",