Skip to content

Commit

Permalink
[State Sync] Update unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Mar 24, 2024
1 parent dcbee7f commit 536abdc
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 128 deletions.
138 changes: 88 additions & 50 deletions state-sync/aptos-data-client/src/tests/advertise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ use aptos_storage_service_types::{
requests::{DataRequest, TransactionsWithProofRequest},
responses::{CompleteDataRange, DataResponse, StorageServerSummary, StorageServiceResponse},
};
use aptos_time_service::MockTimeService;
use aptos_types::transaction::{TransactionListWithProof, Version};
use claims::assert_matches;
use std::time::Duration;
use tokio::time::timeout;

#[tokio::test]
async fn request_works_only_when_data_available() {
Expand Down Expand Up @@ -79,12 +82,7 @@ async fn request_works_only_when_data_available() {
});

// Verify the peer's state has been updated
let peer_state = peer_to_states.get(&peer).unwrap().value().clone();
let peer_storage_summary = peer_state
.get_storage_summary_if_not_ignored()
.unwrap()
.clone();
assert_eq!(peer_storage_summary, storage_summary);
verify_peer_state(&client, peer, storage_summary).await;

// Request transactions and verify the request succeeds
let request_timeout = data_client_config.response_timeout_ms;
Expand Down Expand Up @@ -141,18 +139,29 @@ async fn update_global_data_summary() {
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Verify that the advertised data ranges are valid
verify_advertised_transaction_data(&client, peer_version, index + 1, true);
verify_advertised_transaction_data(
&mut mock_time,
&data_client_config,
&client,
peer_version,
index + 1,
true,
)
.await;
}

// Verify that the advertised data ranges are all present
for (index, peer_version) in advertised_peer_versions.iter().enumerate() {
let is_highest_version = index == advertised_peer_versions.len() - 1;
verify_advertised_transaction_data(
&mut mock_time,
&data_client_config,
&client,
*peer_version,
advertised_peer_versions.len(),
is_highest_version,
);
)
.await;
}
}

Expand Down Expand Up @@ -190,7 +199,7 @@ async fn update_peer_states() {
tokio::task::yield_now().await;

// Verify that the high priority peer's state has been updated
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary).await;

// Add a medium priority peer
let (medium_priority_peer, medium_priority_network) =
Expand All @@ -215,12 +224,13 @@ async fn update_peer_states() {
tokio::task::yield_now().await;

// Verify that the peer's states have been set
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary).await;
verify_peer_state(
&client,
medium_priority_peer,
medium_priority_storage_summary,
);
)
.await;

// Add a low priority peer
let (low_priority_peer, low_priority_network) =
Expand Down Expand Up @@ -250,13 +260,14 @@ async fn update_peer_states() {
tokio::task::yield_now().await;

// Verify that the peer's states have been set
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary).await;
verify_peer_state(
&client,
medium_priority_peer,
medium_priority_storage_summary,
);
verify_peer_state(&client, low_priority_peer, low_priority_storage_summary);
)
.await;
verify_peer_state(&client, low_priority_peer, low_priority_storage_summary).await;
}

#[tokio::test]
Expand Down Expand Up @@ -341,51 +352,78 @@ async fn fetch_transactions_and_verify_failure(
}

/// Verifies that the advertised transaction data is valid
fn verify_advertised_transaction_data(
async fn verify_advertised_transaction_data(
mock_time: &mut MockTimeService,
data_client_config: &AptosDataClientConfig,
client: &AptosDataClient,
advertised_version: Version,
expected_num_advertisements: usize,
is_highest_version: bool,
) {
// Get the advertised data
let global_data_summary = client.get_global_data_summary();
let advertised_data = global_data_summary.advertised_data;

// Verify the number of advertised entries
assert_eq!(
advertised_data.transactions.len(),
expected_num_advertisements
);

// Verify that the advertised transaction data contains an entry for the given version
assert!(advertised_data
.transactions
.contains(&CompleteDataRange::new(0, advertised_version).unwrap()));

// Verify that the highest synced ledger info is valid (if this is the highest advertised version)
if is_highest_version {
let highest_synced_ledger_info = advertised_data.highest_synced_ledger_info().unwrap();
assert_eq!(
highest_synced_ledger_info.ledger_info().version(),
advertised_version
);
}
// Wait for the advertised data to be updated
timeout(Duration::from_secs(10), async {
loop {
// Advance time so the poller updates the global data summary
utils::advance_polling_timer(mock_time, data_client_config).await;

// Sleep for a while before retrying
tokio::time::sleep(Duration::from_millis(100)).await;

// Get the advertised data
let global_data_summary = client.get_global_data_summary();
let advertised_data = global_data_summary.advertised_data;

// Verify the number of advertised entries
if advertised_data.transactions.len() != expected_num_advertisements {
continue; // The advertised data has not been updated yet
}

// Verify that the advertised transaction data contains an entry for the given version
let transaction_range = CompleteDataRange::new(0, advertised_version).unwrap();
if !advertised_data.transactions.contains(&transaction_range) {
continue; // The advertised data has not been updated yet
}

// Verify that the highest synced ledger info is valid (if this is the highest advertised version)
if is_highest_version {
let highest_synced_ledger_info =
advertised_data.highest_synced_ledger_info().unwrap();
if highest_synced_ledger_info.ledger_info().version() != advertised_version {
continue; // The advertised data has not been updated yet
}
}

// All checks passed
return;
}
})
.await
.expect("The advertised data was not updated correctly! Timed out!");
}

/// Verifies that the peer's state is valid (i.e., the storage summary is correct)
fn verify_peer_state(
/// Verifies that the peer's state is updated to the correct value
async fn verify_peer_state(
client: &AptosDataClient,
peer: PeerNetworkId,
expected_storage_summary: StorageServerSummary,
) {
// Get the peer's state
let peer_to_states = client.get_peer_states().get_peer_to_states();
let peer_state = peer_to_states.get(&peer).unwrap().value().clone();

// Verify that the peer's storage summary is valid
let peer_storage_summary = peer_state
.get_storage_summary_if_not_ignored()
.unwrap()
.clone();
assert_eq!(peer_storage_summary, expected_storage_summary);
// Wait for the peer's state to be updated to the expected storage summary
timeout(Duration::from_secs(10), async {
loop {
// Check if the peer's state has been updated
let peer_to_states = client.get_peer_states().get_peer_to_states();
if let Some(peer_state) = peer_to_states.get(&peer) {
if let Some(storage_summary) = peer_state.get_storage_summary_if_not_ignored() {
if storage_summary == &expected_storage_summary {
return; // The peer's state has been updated correctly
}
}
}

// Sleep for a while before retrying
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.expect("The peer state was not updated to the expected storage summary! Timed out!");
}
Loading

0 comments on commit 536abdc

Please sign in to comment.