Skip to content

Commit

Permalink
[State Sync] Continuous syncer unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed May 4, 2022
1 parent 6ccbbbd commit 0c46c99
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 97 deletions.
168 changes: 90 additions & 78 deletions state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use crate::{
error::Error,
tests::{
mocks::{
create_mock_db_reader, create_mock_storage_synchronizer, create_mock_streaming_client,
MockDatabaseReader, MockStorageSynchronizer, MockStreamingClient,
create_mock_db_reader, create_mock_streaming_client, create_ready_storage_synchronizer,
MockStorageSynchronizer, MockStreamingClient,
},
utils::{
create_full_node_driver_configuration, create_output_list_with_proof,
create_data_stream_listener, create_full_node_driver_configuration,
create_global_summary, create_output_list_with_proof,
create_random_epoch_ending_ledger_info, create_startup_info, create_transaction_info,
create_transaction_list_with_proof,
},
Expand All @@ -23,17 +24,14 @@ use aptos_types::{
transaction::{TransactionOutputListWithProof, Version},
waypoint::Waypoint,
};
use channel::{aptos_channel, aptos_channel::Sender, message_queues::QueueStyle};
use claim::{assert_matches, assert_none, assert_ok};
use data_streaming_service::{
data_notification::{DataNotification, DataPayload},
data_stream::DataStreamListener,
streaming_client::{Epoch, NotificationFeedback},
streaming_client::NotificationFeedback,
};
use futures::{channel::oneshot, FutureExt};
use mockall::predicate::eq;
use mockall::{predicate::eq, Sequence};
use std::sync::Arc;
use storage_service_types::CompleteDataRange;

#[tokio::test]
async fn test_bootstrap_genesis_waypoint() {
Expand Down Expand Up @@ -140,11 +138,17 @@ async fn test_critical_timeout() {

// Create the mock streaming client
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener));
let mut expectation_sequence = Sequence::new();
let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.times(1)
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}

// Create the bootstrapper
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client);
Expand All @@ -157,7 +161,7 @@ async fn test_critical_timeout() {
.await
.unwrap();

// Drive progress twice and verify we get timeout errors
// Drive progress twice and verify we get non-critical timeouts
for _ in 0..2 {
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
Expand All @@ -170,6 +174,17 @@ async fn test_critical_timeout() {
.await
.unwrap_err();
assert_matches!(error, Error::CriticalDataStreamTimeout(_));

// Drive progress to initialize the epoch ending data stream again
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();

// Drive progress again and verify we get a non-critical timeout
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}

#[tokio::test]
Expand All @@ -185,15 +200,21 @@ async fn test_data_stream_accounts() {

// Create the mock streaming client
let mut mock_streaming_client = create_mock_streaming_client();
let (notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_transaction_outputs()
.with(
eq(highest_version),
eq(highest_version),
eq(highest_version),
)
.return_once(move |_, _, _| Ok(data_stream_listener));
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transaction_outputs()
.times(1)
.with(
eq(highest_version),
eq(highest_version),
eq(highest_version),
)
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
Expand Down Expand Up @@ -222,13 +243,18 @@ async fn test_data_stream_accounts() {
notification_id,
data_payload: DataPayload::TransactionOutputsWithProof(create_output_list_with_proof()),
};
notification_sender.push((), data_notification).unwrap();
notification_sender_1.push((), data_notification).unwrap();

// Drive progress again and ensure we get a verification error
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));

// Drive progress to initialize the account states stream
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}

#[tokio::test]
Expand All @@ -244,12 +270,18 @@ async fn test_data_stream_transactions() {
BootstrappingMode::ExecuteTransactionsFromGenesis;

// Create the mock streaming client
let (notification_sender, data_stream_listener) = create_data_stream_listener();
let mut mock_streaming_client = create_mock_streaming_client();
mock_streaming_client
.expect_get_all_transactions()
.with(eq(1), eq(highest_version), eq(highest_version), eq(false))
.return_once(move |_, _, _, _| Ok(data_stream_listener));
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transactions()
.times(1)
.with(eq(1), eq(highest_version), eq(highest_version), eq(false))
.return_once(move |_, _, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
Expand Down Expand Up @@ -278,13 +310,18 @@ async fn test_data_stream_transactions() {
notification_id,
data_payload: DataPayload::TransactionsWithProof(create_transaction_list_with_proof()),
};
notification_sender.push((), data_notification).unwrap();
notification_sender_1.push((), data_notification).unwrap();

// Drive progress again and ensure we get a verification error
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));

// Drive progress to initialize the transaction output stream
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}

#[tokio::test]
Expand All @@ -301,11 +338,17 @@ async fn test_data_stream_transaction_outputs() {

// Create the mock streaming client
let mut mock_streaming_client = create_mock_streaming_client();
let (notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_transaction_outputs()
.with(eq(1), eq(highest_version), eq(highest_version))
.return_once(move |_, _, _| Ok(data_stream_listener));
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transaction_outputs()
.times(1)
.with(eq(1), eq(highest_version), eq(highest_version))
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
Expand Down Expand Up @@ -336,13 +379,18 @@ async fn test_data_stream_transaction_outputs() {
TransactionOutputListWithProof::new_empty(),
),
};
notification_sender.push((), data_notification).unwrap();
notification_sender_1.push((), data_notification).unwrap();

// Drive progress again and ensure we get a verification error
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));

// Drive progress to initialize the transaction output stream
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -519,27 +567,6 @@ fn create_bootstrapper(
let mock_storage_synchronizer = create_ready_storage_synchronizer();

// Create the mock db reader with only genesis loaded
let mock_database_reader = create_mock_db_reader_with_genesis();

Bootstrapper::new(
driver_configuration,
mock_streaming_client,
Arc::new(mock_database_reader),
mock_storage_synchronizer,
)
}

/// Creates a new data stream listener and notification sender pair
fn create_data_stream_listener() -> (Sender<(), DataNotification>, DataStreamListener) {
let (notification_sender, notification_receiver) =
aptos_channel::new(QueueStyle::KLAST, 100, None);
let data_stream_listener = DataStreamListener::new(notification_receiver);

(notification_sender, data_stream_listener)
}

/// Creates a mock database reader with genesis already in the db
fn create_mock_db_reader_with_genesis() -> MockDatabaseReader {
let mut mock_database_reader = create_mock_db_reader();
mock_database_reader
.expect_get_startup_info()
Expand All @@ -548,27 +575,12 @@ fn create_mock_db_reader_with_genesis() -> MockDatabaseReader {
.expect_get_latest_transaction_info_option()
.returning(|| Ok(Some((0, create_transaction_info()))));

mock_database_reader
}

/// Creates a mock storage synchronizer that is not currently handling
/// any pending storage data.
fn create_ready_storage_synchronizer() -> MockStorageSynchronizer {
let mut mock_storage_synchronizer = create_mock_storage_synchronizer();
mock_storage_synchronizer
.expect_pending_storage_data()
.return_const(false);

mock_storage_synchronizer
}

/// Creates a global data summary with the highest ended epoch
fn create_global_summary(highest_ended_epoch: Epoch) -> GlobalDataSummary {
let mut global_data_summary = GlobalDataSummary::empty();
global_data_summary
.advertised_data
.epoch_ending_ledger_infos = vec![CompleteDataRange::new(0, highest_ended_epoch).unwrap()];
global_data_summary
Bootstrapper::new(
driver_configuration,
mock_streaming_client,
Arc::new(mock_database_reader),
mock_storage_synchronizer,
)
}

/// Drives progress for the given bootstrapper. If `until_bootstrapped`
Expand Down
Loading

0 comments on commit 0c46c99

Please sign in to comment.