Skip to content

Commit

Permalink
[Aptos Data Client] Update tests for multi-fetch.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Dec 13, 2023
1 parent 477d417 commit f428324
Show file tree
Hide file tree
Showing 9 changed files with 2,227 additions and 1,428 deletions.
200 changes: 126 additions & 74 deletions state-sync/aptos-data-client/src/tests/advertise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
interface::AptosDataClientInterface,
peer_states::calculate_optimal_chunk_sizes,
poller,
priority::PeerPriority,
tests::{mock::MockNetwork, utils},
};
use aptos_config::{config::AptosDataClientConfig, network_id::PeerNetworkId};
Expand All @@ -19,29 +20,31 @@ use claims::assert_matches;

#[tokio::test]
async fn request_works_only_when_data_available() {
// Ensure the properties hold for both priority and non-priority peers
for poll_priority_peers in [true, false] {
// Ensure the properties hold for all peer priorities
for peer_priority in PeerPriority::get_all_ordered_priorities() {
// Create a base config for a validator
let base_config = utils::create_validator_base_config();

// Create the mock network, mock time, client and poller
let data_client_config = AptosDataClientConfig::default();
let (mut mock_network, mut mock_time, client, poller) =
MockNetwork::new(None, Some(data_client_config), None);
MockNetwork::new(Some(base_config), Some(data_client_config), None);

// Start the poller
tokio::spawn(poller::start_poller(poller));

// Request transactions and verify the request fails (no peers are connected)
fetch_transactions_and_verify_failure(&data_client_config, &client, 100).await;
fetch_transactions_and_verify_failure(&data_client_config, &client, 100, true).await;

// Add a connected peer
let (peer, network_id) = utils::add_peer_to_network(poll_priority_peers, &mut mock_network);
let (peer, network_id) = utils::add_peer_to_network(peer_priority, &mut mock_network);

// Verify the peer's state has not been updated
let peer_states = client.get_peer_states();
let peer_to_states = peer_states.get_peer_to_states();
let peer_to_states = client.get_peer_states().get_peer_to_states();
assert!(peer_to_states.is_empty());

// Request transactions and verify the request fails (no peers are advertising data)
fetch_transactions_and_verify_failure(&data_client_config, &client, 100).await;
fetch_transactions_and_verify_failure(&data_client_config, &client, 100, false).await;

// Advance time so the poller sends a data summary request
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;
Expand Down Expand Up @@ -95,10 +98,13 @@ async fn request_works_only_when_data_available() {

#[tokio::test]
async fn update_global_data_summary() {
// Create a base config for a validator
let base_config = utils::create_validator_base_config();

// Create the mock network, mock time, client and poller
let data_client_config = AptosDataClientConfig::default();
let (mut mock_network, mut mock_time, client, poller) =
MockNetwork::new(None, Some(data_client_config), None);
MockNetwork::new(Some(base_config), Some(data_client_config), None);

// Start the poller
tokio::spawn(poller::start_poller(poller));
Expand All @@ -107,109 +113,150 @@ async fn update_global_data_summary() {
let global_data_summary = client.get_global_data_summary();
assert!(global_data_summary.is_empty());

// Add a priority peer
let (_, priority_network) = utils::add_peer_to_network(true, &mut mock_network);

// Advance time so the poller sends a data summary request to the peer
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Handle the priority peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, priority_network).await;
let priority_peer_version = 10_000;
let priority_storage_summary = utils::create_storage_summary(priority_peer_version);
let data_response = DataResponse::StorageServerSummary(priority_storage_summary.clone());
network_request
.response_sender
.send(Ok(StorageServiceResponse::new(data_response, true).unwrap()));

// Advance time so the poller updates the global data summary
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Verify that the advertised data ranges are valid
verify_advertised_transaction_data(&client, priority_peer_version, 1, true);
// Add several peers of different priorities and advertise data for them
let mut advertised_peer_versions = vec![];
for (index, peer_priority) in PeerPriority::get_all_ordered_priorities()
.iter()
.enumerate()
{
// Add the peer
let (_, network_id) = utils::add_peer_to_network(*peer_priority, &mut mock_network);

// Add a regular peer
let (_, regular_network) = utils::add_peer_to_network(false, &mut mock_network);
// Advance time so the poller sends a data summary request
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Advance time so the poller sends a data summary request for both peers
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;
// Create the peer's storage summary
let peer_version = ((index + 1) * 10000) as u64;
let storage_summary = utils::create_storage_summary(peer_version);
advertised_peer_versions.push(peer_version);

// Handle the priority peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, priority_network).await;
let priority_peer_version = 20_000;
let priority_storage_summary = utils::create_storage_summary(priority_peer_version);
utils::handle_storage_summary_request(network_request, priority_storage_summary.clone());
// Handle the peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, network_id).await;
let data_response = DataResponse::StorageServerSummary(storage_summary.clone());
network_request
.response_sender
.send(Ok(StorageServiceResponse::new(data_response, true).unwrap()));

// Handle the regular peer's data summary request (using more data)
let network_request = utils::get_network_request(&mut mock_network, regular_network).await;
let regular_peer_version = 30_000;
let regular_storage_summary = utils::create_storage_summary(regular_peer_version);
utils::handle_storage_summary_request(network_request, regular_storage_summary.clone());
// Advance time so the poller updates the global data summary
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Advance time so the poller elapses
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;
// Verify that the advertised data ranges are valid
verify_advertised_transaction_data(&client, peer_version, index + 1, true);
}

// Verify that the advertised data ranges are valid
verify_advertised_transaction_data(&client, priority_peer_version, 2, false);
verify_advertised_transaction_data(&client, regular_peer_version, 2, true);
// Verify that the advertised data ranges are all present
for (index, peer_version) in advertised_peer_versions.iter().enumerate() {
let is_highest_version = index == advertised_peer_versions.len() - 1;
verify_advertised_transaction_data(
&client,
*peer_version,
advertised_peer_versions.len(),
is_highest_version,
);
}
}

#[tokio::test]
async fn update_peer_states() {
// Create a base config for a validator
let base_config = utils::create_validator_base_config();

// Create the mock network, mock time, client and poller
let data_client_config = AptosDataClientConfig::default();
let (mut mock_network, mut mock_time, client, poller) =
MockNetwork::new(None, Some(data_client_config), None);
MockNetwork::new(Some(base_config), Some(data_client_config), None);

// Start the poller
tokio::spawn(poller::start_poller(poller));

// Add a priority peer
let (priority_peer, priority_network) = utils::add_peer_to_network(true, &mut mock_network);
// Add a high priority peer
let (high_priority_peer, high_priority_network) =
utils::add_peer_to_network(PeerPriority::HighPriority, &mut mock_network);

// Verify that we have no peer states
let peer_states = client.get_peer_states();
let peer_to_states = peer_states.get_peer_to_states();
let peer_to_states = client.get_peer_states().get_peer_to_states();
assert!(peer_to_states.is_empty());

// Advance time so the poller sends a data summary request for the peer
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Handle the priority peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, priority_network).await;
let priority_storage_summary = utils::create_storage_summary(1111);
utils::handle_storage_summary_request(network_request, priority_storage_summary.clone());
// Handle the high priority peer's data summary request
let network_request =
utils::get_network_request(&mut mock_network, high_priority_network).await;
let high_priority_storage_summary = utils::create_storage_summary(1111);
utils::handle_storage_summary_request(network_request, high_priority_storage_summary.clone());

// Let the poller finish processing the responses
tokio::task::yield_now().await;

// Verify that the priority peer's state has been updated
verify_peer_state(&client, priority_peer, priority_storage_summary);
// Verify that the high priority peer's state has been updated
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);

// Add a regular peer
let (regular_peer, regular_network) = utils::add_peer_to_network(false, &mut mock_network);
// Add a medium priority peer
let (medium_priority_peer, medium_priority_network) =
utils::add_peer_to_network(PeerPriority::MediumPriority, &mut mock_network);

// Advance time so the poller sends a data summary request for both peers
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Handle the priority peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, priority_network).await;
let priority_storage_summary = utils::create_storage_summary(2222);
utils::handle_storage_summary_request(network_request, priority_storage_summary.clone());
// Handle the high priority peer's data summary request
let network_request =
utils::get_network_request(&mut mock_network, high_priority_network).await;
let high_priority_storage_summary = utils::create_storage_summary(2222);
utils::handle_storage_summary_request(network_request, high_priority_storage_summary.clone());

// Handle the regular peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, regular_network).await;
let regular_storage_summary = utils::create_storage_summary(3333);
utils::handle_storage_summary_request(network_request, regular_storage_summary.clone());
// Handle the medium peer's data summary request
let network_request =
utils::get_network_request(&mut mock_network, medium_priority_network).await;
let medium_priority_storage_summary = utils::create_storage_summary(3333);
utils::handle_storage_summary_request(network_request, medium_priority_storage_summary.clone());

// Let the poller finish processing the responses
tokio::task::yield_now().await;

// Verify that the priority peer's state has been updated
verify_peer_state(&client, priority_peer, priority_storage_summary);
// Verify that the peer's states have been set
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);
verify_peer_state(
&client,
medium_priority_peer,
medium_priority_storage_summary,
);

// Add a low priority peer
let (low_priority_peer, low_priority_network) =
utils::add_peer_to_network(PeerPriority::LowPriority, &mut mock_network);

// Advance time so the poller sends a data summary request for all peers
utils::advance_polling_timer(&mut mock_time, &data_client_config).await;

// Handle the high priority peer's data summary request
let network_request =
utils::get_network_request(&mut mock_network, high_priority_network).await;
let high_priority_storage_summary = utils::create_storage_summary(4444);
utils::handle_storage_summary_request(network_request, high_priority_storage_summary.clone());

// Handle the medium peer's data summary request
let network_request =
utils::get_network_request(&mut mock_network, medium_priority_network).await;
let medium_priority_storage_summary = utils::create_storage_summary(5555);
utils::handle_storage_summary_request(network_request, medium_priority_storage_summary.clone());

// Handle the low priority peer's data summary request
let network_request = utils::get_network_request(&mut mock_network, low_priority_network).await;
let low_priority_storage_summary = utils::create_storage_summary(6666);
utils::handle_storage_summary_request(network_request, low_priority_storage_summary.clone());

// Let the poller finish processing the responses
tokio::task::yield_now().await;

// Verify that the regular peer's state has been set
verify_peer_state(&client, regular_peer, regular_storage_summary);
// Verify that the peer's states have been set
verify_peer_state(&client, high_priority_peer, high_priority_storage_summary);
verify_peer_state(
&client,
medium_priority_peer,
medium_priority_storage_summary,
);
verify_peer_state(&client, low_priority_peer, low_priority_storage_summary);
}

#[tokio::test]
Expand Down Expand Up @@ -276,6 +323,7 @@ async fn fetch_transactions_and_verify_failure(
data_client_config: &AptosDataClientConfig,
data_client: &AptosDataClient,
version: u64,
no_connected_peers: bool,
) {
// Request the transactions with proof
let request_timeout = data_client_config.response_timeout_ms;
Expand All @@ -285,7 +333,11 @@ async fn fetch_transactions_and_verify_failure(
.unwrap_err();

// Verify the error is correct
assert_matches!(error, Error::DataIsUnavailable(_));
if no_connected_peers {
assert_matches!(error, Error::NoConnectedPeers(_));
} else {
assert_matches!(error, Error::DataIsUnavailable(_));
}
}

/// Verifies that the advertised transaction data is valid
Expand Down
Loading

0 comments on commit f428324

Please sign in to comment.