Skip to content

Commit

Permalink
[mempool] refactoring mempool mock to support running in a tokio runtime
Browse files Browse the repository at this point in the history
Closes: #9276
  • Loading branch information
Xiao Li authored and bors-libra committed Sep 27, 2021
1 parent 2d5647f commit 22ffeab
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 57 deletions.
2 changes: 1 addition & 1 deletion consensus/src/twins/twins_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl SMRNode {

let (state_sync_client, state_sync) = mpsc::unbounded();
let (commit_cb_sender, commit_cb_receiver) = mpsc::unbounded::<LedgerInfoWithSignatures>();
let shared_mempool = MockSharedMempool::new(None);
let shared_mempool = MockSharedMempool::new();
let consensus_to_mempool_sender = shared_mempool.consensus_sender.clone();
let state_computer = Arc::new(MockStateComputer::new(
state_sync_client,
Expand Down
4 changes: 2 additions & 2 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ mod tests;
pub use shared_mempool::{
bootstrap, network,
types::{
ConsensusRequest, ConsensusResponse, MempoolClientSender, SubmissionStatus,
TransactionSummary,
ConsensusRequest, ConsensusResponse, MempoolClientSender, MempoolEventsReceiver,
SubmissionStatus, TransactionSummary,
},
};
#[cfg(any(test, feature = "fuzzing"))]
Expand Down
16 changes: 6 additions & 10 deletions mempool/src/shared_mempool/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ use crate::{
shared_mempool::{
coordinator::{coordinator, gc_coordinator, snapshot_job},
peer_manager::PeerManager,
types::{SharedMempool, SharedMempoolNotification},
types::{MempoolEventsReceiver, SharedMempool, SharedMempoolNotification},
},
ConsensusRequest, SubmissionStatus,
ConsensusRequest,
};
use anyhow::Result;
use diem_config::{config::NodeConfig, network_id::NetworkId};
use diem_infallible::{Mutex, RwLock};
use diem_types::{protocol_spec::DpnProto, transaction::SignedTransaction};
use diem_types::protocol_spec::DpnProto;
use event_notifications::ReconfigNotificationListener;
use futures::channel::{
mpsc::{self, Receiver, UnboundedSender},
oneshot,
};
use futures::channel::mpsc::{self, Receiver, UnboundedSender};
use mempool_notifications::MempoolNotificationListener;
use std::{collections::HashMap, sync::Arc};
use storage_interface::DbReader;
Expand All @@ -38,7 +34,7 @@ pub(crate) fn start_shared_mempool<V>(
// First element in tuple is the network ID.
// See `NodeConfig::is_upstream_peer` for the definition of network ID.
mempool_network_handles: Vec<(NetworkId, MempoolNetworkSender, MempoolNetworkEvents)>,
client_events: mpsc::Receiver<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>,
client_events: MempoolEventsReceiver,
consensus_requests: mpsc::Receiver<ConsensusRequest>,
mempool_listener: MempoolNotificationListener,
mempool_reconfig_events: ReconfigNotificationListener,
Expand Down Expand Up @@ -94,7 +90,7 @@ pub fn bootstrap(
// The first element in the tuple is the ID of the network that this network is a handle to.
// See `NodeConfig::is_upstream_peer` for the definition of network ID.
mempool_network_handles: Vec<(NetworkId, MempoolNetworkSender, MempoolNetworkEvents)>,
client_events: Receiver<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>,
client_events: MempoolEventsReceiver,
consensus_requests: Receiver<ConsensusRequest>,
mempool_listener: MempoolNotificationListener,
mempool_reconfig_events: ReconfigNotificationListener,
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/shared_mempool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,5 @@ pub type SubmissionStatusBundle = (SignedTransaction, SubmissionStatus);

pub type MempoolClientSender =
mpsc::Sender<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>;
pub type MempoolEventsReceiver =
mpsc::Receiver<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>;
65 changes: 44 additions & 21 deletions mempool/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,71 @@ use diem_types::{
};
use event_notifications::EventSubscriptionService;
use futures::channel::{mpsc, oneshot};
use mempool_notifications::{self, MempoolNotificationListener, MempoolNotifier};
use mempool_notifications::{self, MempoolNotifier};
use network::{
peer_manager::{conn_notifs_channel, ConnectionRequestSender, PeerManagerRequestSender},
protocols::network::{NewNetworkEvents, NewNetworkSender},
};
use std::sync::Arc;
use storage_interface::{mock::MockDbReaderWriter, DbReaderWriter};
use tokio::runtime::{Builder, Runtime};
use tokio::runtime::{Builder, Handle, Runtime};
use vm_validator::mocks::mock_vm_validator::MockVMValidator;

/// Mock of a running instance of shared mempool.
pub struct MockSharedMempool {
_runtime: Runtime,
_runtime: Option<Runtime>,
_handle: Option<Handle>,
pub ac_client: mpsc::Sender<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>,
pub mempool: Arc<Mutex<CoreMempool>>,
pub consensus_sender: mpsc::Sender<ConsensusRequest>,
pub mempool_notifier: Option<MempoolNotifier>,
pub mempool_notifier: MempoolNotifier,
}

impl MockSharedMempool {
/// Creates a mock of a running instance of shared mempool.
/// Returns the runtime on which the shared mempool is running
/// and the channel through which shared mempool receives client events.
pub fn new(mempool_listener: Option<MempoolNotificationListener>) -> Self {
pub fn new() -> Self {
let runtime = Builder::new_multi_thread()
.thread_name("mock-shared-mem")
.enable_all()
.build()
.expect("[mock shared mempool] failed to create runtime");
let (ac_client, mempool, consensus_sender, mempool_notifier) =
Self::start(runtime.handle());
Self {
_runtime: Some(runtime),
_handle: None,
ac_client,
mempool,
consensus_sender,
mempool_notifier,
}
}

/// Creates a mock of a running instance of shared mempool inside a tokio runtime;
/// Holds a runtime handle instead.
pub fn new_in_runtime() -> Self {
let handle = Handle::current();
let (ac_client, mempool, consensus_sender, mempool_notifier) = Self::start(&handle);
Self {
_runtime: None,
_handle: Some(handle),
ac_client,
mempool,
consensus_sender,
mempool_notifier,
}
}

pub fn start(
handle: &Handle,
) -> (
mpsc::Sender<(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>)>,
Arc<Mutex<CoreMempool>>,
mpsc::Sender<ConsensusRequest>,
MempoolNotifier,
) {
let mut config = NodeConfig::random();
config.validator_network = Some(NetworkConfig::network_with_id(NetworkId::Validator));

Expand All @@ -67,14 +102,8 @@ impl MockSharedMempool {
let network_events = MempoolNetworkEvents::new(network_notifs_rx, conn_notifs_rx);
let (ac_client, client_events) = mpsc::channel(1_024);
let (consensus_sender, consensus_events) = mpsc::channel(1_024);
let (mempool_notifier, mempool_listener) = match mempool_listener {
None => {
let (mempool_notifier, mempool_listener) =
mempool_notifications::new_mempool_notifier_listener_pair();
(Some(mempool_notifier), mempool_listener)
}
Some(mempool_listener) => (None, mempool_listener),
};
let (mempool_notifier, mempool_listener) =
mempool_notifications::new_mempool_notifier_listener_pair();
let mut event_subscriber = EventSubscriptionService::new(
ON_CHAIN_CONFIG_REGISTRY,
Arc::new(RwLock::new(DbReaderWriter::new(MockDbReaderWriter))),
Expand All @@ -83,7 +112,7 @@ impl MockSharedMempool {
let network_handles = vec![(NetworkId::Validator, network_sender, network_events)];

start_shared_mempool(
runtime.handle(),
handle,
&config,
mempool.clone(),
network_handles,
Expand All @@ -96,13 +125,7 @@ impl MockSharedMempool {
vec![],
);

Self {
_runtime: runtime,
ac_client,
mempool,
consensus_sender,
mempool_notifier,
}
(ac_client, mempool, consensus_sender, mempool_notifier)
}

pub fn add_txns(&self, txns: Vec<SignedTransaction>) -> Result<()> {
Expand Down
9 changes: 4 additions & 5 deletions mempool/src/tests/shared_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::runtime::Builder;

#[test]
fn test_consensus_events_rejected_txns() {
let smp = MockSharedMempool::new(None);
let smp = MockSharedMempool::new();

// Add txns 1, 2, 3, 4
// Txn 1: committed successfully
Expand Down Expand Up @@ -59,9 +59,7 @@ fn test_mempool_notify_committed_txns() {
let _enter = runtime.enter();

// Create a new mempool notifier, listener and shared mempool
let (mempool_notifier, mempool_listener) =
mempool_notifications::new_mempool_notifier_listener_pair();
let smp = MockSharedMempool::new(Some(mempool_listener));
let smp = MockSharedMempool::new();

// Add txns 1, 2, 3, 4
// Txn 1: committed successfully
Expand All @@ -83,7 +81,8 @@ fn test_mempool_notify_committed_txns() {

let committed_txns = vec![Transaction::UserTransaction(committed_txn)];
block_on(async {
assert!(mempool_notifier
assert!(smp
.mempool_notifier
.notify_new_commit(committed_txns, 1, 1000)
.await
.is_ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn new_mempool_notifier_listener_pair() -> (MempoolNotifier, MempoolNotifica
}

/// The state sync component responsible for notifying mempool.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct MempoolNotifier {
notification_sender: mpsc::Sender<MempoolCommitNotification>,
}
Expand Down
6 changes: 3 additions & 3 deletions state-sync/state-sync-v1/tests/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ impl StateSyncEnvironment {
peer.signer.clone(),
)));

let (mempool_notifier, mempool_listener) =
mempool_notifications::new_mempool_notifier_listener_pair();
peer.mempool = Some(MockSharedMempool::new(Some(mempool_listener)));
let mempool = MockSharedMempool::new();
let mempool_notifier = mempool.mempool_notifier.clone();
peer.mempool = Some(mempool);

let (consensus_notifier, consensus_listener) =
consensus_notifications::new_consensus_notifier_listener_pair(
Expand Down
36 changes: 22 additions & 14 deletions vm-validator/src/mocks/mock_vm_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ use diem_types::{
};
use diem_vm::VMValidator;

pub const ACCOUNT_DNE_TEST_ADD: AccountAddress =
AccountAddress::new([0_u8; AccountAddress::LENGTH]);
pub const INVALID_SIG_TEST_ADD: AccountAddress =
AccountAddress::new([1_u8; AccountAddress::LENGTH]);
pub const INSUFFICIENT_BALANCE_TEST_ADD: AccountAddress =
AccountAddress::new([2_u8; AccountAddress::LENGTH]);
pub const SEQ_NUMBER_TOO_NEW_TEST_ADD: AccountAddress =
AccountAddress::new([3_u8; AccountAddress::LENGTH]);
pub const SEQ_NUMBER_TOO_OLD_TEST_ADD: AccountAddress =
AccountAddress::new([4_u8; AccountAddress::LENGTH]);
pub const TXN_EXPIRATION_TIME_TEST_ADD: AccountAddress =
AccountAddress::new([5_u8; AccountAddress::LENGTH]);
pub const INVALID_AUTH_KEY_TEST_ADD: AccountAddress =
AccountAddress::new([6_u8; AccountAddress::LENGTH]);

#[derive(Clone)]
pub struct MockVMValidator;

Expand Down Expand Up @@ -40,26 +55,19 @@ impl TransactionValidation for MockVMValidator {
};

let sender = txn.sender();
let account_dne_test_add = AccountAddress::new([0_u8; AccountAddress::LENGTH]);
let invalid_sig_test_add = AccountAddress::new([1_u8; AccountAddress::LENGTH]);
let insufficient_balance_test_add = AccountAddress::new([2_u8; AccountAddress::LENGTH]);
let seq_number_too_new_test_add = AccountAddress::new([3_u8; AccountAddress::LENGTH]);
let seq_number_too_old_test_add = AccountAddress::new([4_u8; AccountAddress::LENGTH]);
let txn_expiration_time_test_add = AccountAddress::new([5_u8; AccountAddress::LENGTH]);
let invalid_auth_key_test_add = AccountAddress::new([6_u8; AccountAddress::LENGTH]);
let ret = if sender == account_dne_test_add {
let ret = if sender == ACCOUNT_DNE_TEST_ADD {
Some(StatusCode::SENDING_ACCOUNT_DOES_NOT_EXIST)
} else if sender == invalid_sig_test_add {
} else if sender == INVALID_SIG_TEST_ADD {
Some(StatusCode::INVALID_SIGNATURE)
} else if sender == insufficient_balance_test_add {
} else if sender == INSUFFICIENT_BALANCE_TEST_ADD {
Some(StatusCode::INSUFFICIENT_BALANCE_FOR_TRANSACTION_FEE)
} else if sender == seq_number_too_new_test_add {
} else if sender == SEQ_NUMBER_TOO_NEW_TEST_ADD {
Some(StatusCode::SEQUENCE_NUMBER_TOO_NEW)
} else if sender == seq_number_too_old_test_add {
} else if sender == SEQ_NUMBER_TOO_OLD_TEST_ADD {
Some(StatusCode::SEQUENCE_NUMBER_TOO_OLD)
} else if sender == txn_expiration_time_test_add {
} else if sender == TXN_EXPIRATION_TIME_TEST_ADD {
Some(StatusCode::TRANSACTION_EXPIRED)
} else if sender == invalid_auth_key_test_add {
} else if sender == INVALID_AUTH_KEY_TEST_ADD {
Some(StatusCode::INVALID_AUTH_KEY)
} else {
None
Expand Down

0 comments on commit 22ffeab

Please sign in to comment.