Skip to content

Commit

Permalink
[State Sync] Update the unit tests for async mempool notifications.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Apr 26, 2022
1 parent 64699ca commit 042a43a
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,33 @@ use crate::{
error::Error,
notification_handlers::{
CommitNotification, CommitNotificationListener, CommittedTransactions,
ErrorNotificationListener,
ErrorNotificationListener, MempoolNotificationHandler,
},
storage_synchronizer::{StorageSynchronizer, StorageSynchronizerInterface},
tests::{
mocks::{
create_mock_db_writer, create_mock_executor, create_mock_reader_writer,
create_mock_receiver, MockChunkExecutor,
create_mock_db_reader, create_mock_db_writer, create_mock_executor,
create_mock_reader_writer, create_mock_receiver, MockChunkExecutor,
},
utils::{
create_epoch_ending_ledger_info, create_event, create_output_list_with_proof,
create_state_value_chunk_with_proof, create_transaction,
create_startup_info, create_state_value_chunk_with_proof, create_transaction,
create_transaction_list_with_proof,
},
},
};
use anyhow::format_err;
use aptos_config::config::StateSyncDriverConfig;
use aptos_infallible::{Mutex, RwLock};
use aptos_types::{
contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, transaction::Transaction,
contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures,
on_chain_config::ON_CHAIN_CONFIG_REGISTRY, transaction::Transaction,
};
use claim::assert_matches;
use data_streaming_service::data_notification::NotificationId;
use event_notifications::{EventNotificationListener, EventSubscriptionService};
use futures::StreamExt;
use mempool_notifications::{CommittedTransaction, MempoolNotificationListener};
use mockall::predicate::{always, eq};
use std::{sync::Arc, time::Duration};
use storage_interface::DbReaderWriter;
Expand All @@ -53,9 +57,24 @@ async fn test_apply_transaction_outputs() {
.expect_commit_chunk()
.return_once(move || expected_commit_return);

// Set up the mock db reader
let mut db_reader = create_mock_db_reader();
db_reader
.expect_get_startup_info()
.returning(|| Ok(Some(create_startup_info())));

// Create the storage synchronizer
let (mut commit_listener, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(chunk_executor, create_mock_reader_writer(None, None));
let (_, _, event_subscription_service, mut mempool_listener, mut storage_synchronizer, _, _) =
create_storage_synchronizer(
chunk_executor,
create_mock_reader_writer(Some(db_reader), None),
);

// Subscribe to the expected event
let mut event_listener = event_subscription_service
.lock()
.subscribe_to_events(vec![*event_to_commit.key()])
.unwrap();

// Attempt to apply a chunk of outputs
storage_synchronizer
Expand All @@ -67,9 +86,10 @@ async fn test_apply_transaction_outputs() {
)
.unwrap();

// Verify we get a commit notification and that there's no pending data
verify_transaction_commit_notification(
&mut commit_listener,
// Verify we get a mempool and event notification. Also verify that there's no pending data.
verify_mempool_and_event_notification(
Some(&mut event_listener),
&mut mempool_listener,
vec![transaction_to_commit],
vec![event_to_commit],
)
Expand All @@ -87,7 +107,7 @@ async fn test_apply_transaction_outputs_error() {
.returning(|_, _, _| Err(format_err!("Failed to apply chunk!")));

// Create the storage synchronizer
let (_, mut error_listener, mut storage_synchronizer, _, _) =
let (_, mut error_listener, _, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(chunk_executor, create_mock_reader_writer(None, None));

// Attempt to apply a chunk of outputs
Expand Down Expand Up @@ -119,7 +139,7 @@ async fn test_commit_chunk_error() {
.return_once(|| Err(format_err!("Failed to commit chunk!")));

// Create the storage synchronizer
let (_, mut error_listener, mut storage_synchronizer, _, _) =
let (_, mut error_listener, _, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(chunk_executor, create_mock_reader_writer(None, None));

// Attempt to execute a chunk of transactions
Expand Down Expand Up @@ -158,9 +178,24 @@ async fn test_execute_transactions() {
.expect_commit_chunk()
.return_once(move || expected_execute_return);

// Set up the mock db reader
let mut db_reader = create_mock_db_reader();
db_reader
.expect_get_startup_info()
.returning(|| Ok(Some(create_startup_info())));

// Create the storage synchronizer
let (mut commit_listener, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(chunk_executor, create_mock_reader_writer(None, None));
let (_, _, event_subscription_service, mut mempool_listener, mut storage_synchronizer, _, _) =
create_storage_synchronizer(
chunk_executor,
create_mock_reader_writer(Some(db_reader), None),
);

// Subscribe to the expected event
let mut event_listener = event_subscription_service
.lock()
.subscribe_to_events(vec![*event_to_commit.key()])
.unwrap();

// Attempt to execute a chunk of transactions
storage_synchronizer
Expand All @@ -172,9 +207,10 @@ async fn test_execute_transactions() {
)
.unwrap();

// Verify we get a commit notification and that there's no pending data
verify_transaction_commit_notification(
&mut commit_listener,
// Verify we get a mempool and event notification. Also verify that there's no pending data.
verify_mempool_and_event_notification(
Some(&mut event_listener),
&mut mempool_listener,
vec![transaction_to_commit],
vec![event_to_commit],
)
Expand All @@ -192,7 +228,7 @@ async fn test_execute_transactions_error() {
.returning(|_, _, _| Err(format_err!("Failed to execute chunk!")));

// Create the storage synchronizer
let (_, mut error_listener, mut storage_synchronizer, _, _) =
let (_, mut error_listener, _, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(chunk_executor, create_mock_reader_writer(None, None));

// Attempt to execute a chunk of transactions
Expand Down Expand Up @@ -235,10 +271,11 @@ async fn test_initialize_account_synchronizer() {
.return_once(move |_, _| Ok(Box::new(snapshot_receiver)));

// Create the storage synchronizer
let (mut commit_listener, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, Some(db_writer)),
);
let (mut commit_listener, _, _, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, Some(db_writer)),
);

// Initialize the account synchronizer
let _ = storage_synchronizer
Expand Down Expand Up @@ -267,7 +304,7 @@ async fn test_initialize_account_synchronizer_missing_info() {
output_list_with_proof.proof.transaction_infos = vec![]; // This is invalid!

// Create the storage synchronizer
let (_, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
let (_, _, _, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, None),
);
Expand Down Expand Up @@ -295,7 +332,7 @@ async fn test_initialize_account_synchronizer_receiver_error() {
.returning(|_, _| Err(format_err!("Failed to get snapshot receiver!")));

// Create the storage synchronizer
let (_, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
let (_, _, _, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, Some(db_writer)),
);
Expand Down Expand Up @@ -336,6 +373,12 @@ async fn test_save_account_states_completion() {
let mut chunk_executor = create_mock_executor();
chunk_executor.expect_reset().returning(|| Ok(()));

// Set up the mock db reader
let mut db_reader = create_mock_db_reader();
db_reader
.expect_get_startup_info()
.returning(|| Ok(Some(create_startup_info())));

// Setup the mock db writer
let mut db_writer = create_mock_db_writer();
db_writer
Expand All @@ -359,10 +402,17 @@ async fn test_save_account_states_completion() {
db_writer.expect_delete_genesis().returning(|| Ok(()));

// Create the storage synchronizer
let (mut commit_listener, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
chunk_executor,
create_mock_reader_writer(None, Some(db_writer)),
);
let (mut commit_listener, _, _, _, mut storage_synchronizer, _, _) =
create_storage_synchronizer(
chunk_executor,
create_mock_reader_writer(Some(db_reader), Some(db_writer)),
);

// Subscribe to the expected event
let expected_event = output_list_with_proof.transactions_and_outputs[0]
.1
.events()[0]
.clone();

// Initialize the account synchronizer
let account_synchronizer_handle = storage_synchronizer
Expand All @@ -373,7 +423,7 @@ async fn test_save_account_states_completion() {
)
.unwrap();

// Save an account states chunk and verify we get a commit notification
// Save an account states chunk and verify we get an account commit notification
storage_synchronizer
.save_account_states(0, create_state_value_chunk_with_proof(false))
.unwrap();
Expand All @@ -383,14 +433,17 @@ async fn test_save_account_states_completion() {
storage_synchronizer
.save_account_states(1, create_state_value_chunk_with_proof(true))
.unwrap();

// Verify we get a commit notification
let expected_transaction = output_list_with_proof.transactions_and_outputs[0].0.clone();
let expected_committed_transactions = CommittedTransactions {
events: vec![],
transactions: vec![output_list_with_proof.transactions_and_outputs[0].0.clone()],
events: vec![expected_event.clone()],
transactions: vec![expected_transaction.clone()],
};
verify_account_commit_notification(
&mut commit_listener,
true,
Some(expected_committed_transactions),
Some(expected_committed_transactions.clone()),
)
.await;

Expand All @@ -417,7 +470,7 @@ async fn test_save_account_states_dropped_error_listener() {
.return_once(move |_, _| Ok(Box::new(snapshot_receiver)));

// Create the storage synchronizer (drop all listeners)
let (_, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
let (_, _, _, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, Some(db_writer)),
);
Expand Down Expand Up @@ -458,7 +511,7 @@ async fn test_save_account_states_invalid_chunk() {
.return_once(move |_, _| Ok(Box::new(snapshot_receiver)));

// Create the storage synchronizer
let (_, mut error_listener, mut storage_synchronizer, _, _) = create_storage_synchronizer(
let (_, mut error_listener, _, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, Some(db_writer)),
);
Expand All @@ -484,7 +537,7 @@ async fn test_save_account_states_invalid_chunk() {
#[should_panic]
fn test_save_account_states_without_initialize() {
// Create the storage synchronizer
let (_, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
let (_, _, _, _, mut storage_synchronizer, _, _) = create_storage_synchronizer(
create_mock_executor(),
create_mock_reader_writer(None, None),
);
Expand All @@ -501,6 +554,8 @@ fn create_storage_synchronizer(
) -> (
CommitNotificationListener,
ErrorNotificationListener,
Arc<Mutex<EventSubscriptionService>>,
MempoolNotificationListener,
StorageSynchronizer<MockChunkExecutor>,
JoinHandle<()>,
JoinHandle<()>,
Expand All @@ -512,19 +567,34 @@ fn create_storage_synchronizer(
CommitNotificationListener::new();
let (error_notification_sender, error_notification_listener) = ErrorNotificationListener::new();

// Create the event subscription service
let event_subscription_service = Arc::new(Mutex::new(EventSubscriptionService::new(
ON_CHAIN_CONFIG_REGISTRY,
Arc::new(RwLock::new(mock_reader_writer.clone())),
)));

// Create the mempool notification handler
let (mempool_notification_sender, mempool_notification_listener) =
mempool_notifications::new_mempool_notifier_listener_pair();
let mempool_notification_handler = MempoolNotificationHandler::new(mempool_notification_sender);

// Create the storage synchronizer
let (storage_synchronizer, executor_handle, committer_handle) = StorageSynchronizer::new(
StateSyncDriverConfig::default(),
Arc::new(mock_chunk_executor),
commit_notification_sender,
error_notification_sender,
event_subscription_service.clone(),
mempool_notification_handler,
mock_reader_writer,
None,
);

(
commit_notification_listener,
error_notification_listener,
event_subscription_service,
mempool_notification_listener,
storage_synchronizer,
executor_handle,
committer_handle,
Expand All @@ -537,39 +607,45 @@ async fn verify_account_commit_notification(
expected_all_accounts_synced: bool,
expected_committed_transactions: Option<CommittedTransactions>,
) {
match commit_listener.select_next_some().await {
CommitNotification::CommittedAccounts(committed_accounts) => {
assert_eq!(
committed_accounts.all_accounts_synced,
expected_all_accounts_synced
);
assert_eq!(
committed_accounts.committed_transaction,
expected_committed_transactions
);
}
commit_notification => panic!(
"Invalid commit notification received: {:?}",
commit_notification
),
}
let CommitNotification::CommittedAccounts(committed_accounts) =
commit_listener.select_next_some().await;
assert_eq!(
committed_accounts.all_accounts_synced,
expected_all_accounts_synced
);
assert_eq!(
committed_accounts.committed_transaction,
expected_committed_transactions
);
}

/// Verifies that the expected transaction commit notification is received by the listener
async fn verify_transaction_commit_notification(
commit_listener: &mut CommitNotificationListener,
/// Verifies that mempool is notified about the committed transactions and
/// verifies that the event listener is notified about the committed
/// events (if it exists).
async fn verify_mempool_and_event_notification(
event_listener: Option<&mut EventNotificationListener>,
mempool_notification_listener: &mut MempoolNotificationListener,
expected_transactions: Vec<Transaction>,
expected_events: Vec<ContractEvent>,
) {
match commit_listener.select_next_some().await {
CommitNotification::CommittedTransactions(committed_transactions) => {
assert_eq!(committed_transactions.transactions, expected_transactions);
assert_eq!(committed_transactions.events, expected_events);
}
commit_notification => panic!(
"Invalid commit notification received: {:?}",
commit_notification
),
// Verify mempool is notified and ack the notification
let mempool_notification = mempool_notification_listener.select_next_some().await;
let committed_transactions: Vec<CommittedTransaction> = expected_transactions
.into_iter()
.map(|txn| CommittedTransaction {
sender: txn.as_signed_user_txn().unwrap().sender(),
sequence_number: 0,
})
.collect();
assert_eq!(mempool_notification.transactions, committed_transactions);
mempool_notification_listener
.ack_commit_notification(mempool_notification)
.unwrap();

// Verify the event listener is notified about the specified event
if let Some(event_listener) = event_listener {
let event_notification = event_listener.select_next_some().await;
assert_eq!(event_notification.subscribed_events, expected_events);
}
}

Expand Down
Loading

0 comments on commit 042a43a

Please sign in to comment.