Skip to content

Commit

Permalink
[State Sync] Move mempool commit onto critical path.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Apr 26, 2022
1 parent 4c1d813 commit 64699ca
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 110 deletions.
111 changes: 27 additions & 84 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ impl<
consensus_notification_handler: ConsensusNotificationHandler,
driver_configuration: DriverConfiguration,
error_notification_listener: ErrorNotificationListener,
event_subscription_service: EventSubscriptionService,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
storage_synchronizer: StorageSyncer,
aptos_data_client: DataClient,
streaming_service_client: StreamingServiceClient,
storage: Arc<dyn DbReader>,
) -> Self {
let event_subscription_service = Arc::new(Mutex::new(event_subscription_service));
let bootstrapper = Bootstrapper::new(
driver_configuration.clone(),
streaming_service_client.clone(),
Expand Down Expand Up @@ -252,18 +251,17 @@ impl<
// TODO(joshlind): can we get consensus to forward the events?

// Handle the commit notification
let latest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
CommitNotification::handle_transaction_notification(
consensus_commit_notification.reconfiguration_events.clone(),
consensus_commit_notification.transactions.clone(),
latest_synced_version,
latest_synced_ledger_info,
let committed_transactions = CommittedTransactions {
events: consensus_commit_notification.reconfiguration_events.clone(),
transactions: consensus_commit_notification.transactions.clone(),
};
utils::handle_committed_transactions(
committed_transactions,
self.storage.clone(),
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await?;
.await;

// Respond to consensus successfully
self.consensus_notification_handler
Expand Down Expand Up @@ -348,79 +346,19 @@ impl<
}
}

/// Handles a commit notification sent by the storage synchronizer
/// Handles a commit notification sent by the storage synchronizer for new
/// accounts.
async fn handle_commit_notification(&mut self, commit_notification: CommitNotification) {
match commit_notification {
CommitNotification::CommittedAccounts(committed_accounts) => {
debug!(
LogSchema::new(LogEntry::SynchronizerNotification).message(&format!(
"Received an account commit notification from the storage synchronizer. \
let CommitNotification::CommittedAccounts(committed_accounts) = commit_notification;
debug!(
LogSchema::new(LogEntry::SynchronizerNotification).message(&format!(
"Received an account commit notification from the storage synchronizer. \
All synced: {:?}, last committed index: {:?}.",
committed_accounts.all_accounts_synced,
committed_accounts.last_committed_account_index,
))
);
self.handle_committed_accounts(committed_accounts).await;
}
CommitNotification::CommittedTransactions(committed_transactions) => {
debug!(
LogSchema::new(LogEntry::SynchronizerNotification).message(&format!(
"Received a transaction commit notification from the storage synchronizer! \
Transaction total: {:?}, event total: {:?}",
committed_transactions.transactions.len(),
committed_transactions.events.len()
))
);
self.handle_committed_transactions(committed_transactions)
.await;
}
}
}

/// Handles a notification sent by the storage synchronizer for committed transactions
async fn handle_committed_transactions(
&mut self,
committed_transactions: CommittedTransactions,
) {
// Fetch the latest synced version and ledger info from storage
let (latest_synced_version, latest_synced_ledger_info) =
match utils::fetch_latest_synced_version(self.storage.clone()) {
Ok(latest_synced_version) => {
match utils::fetch_latest_synced_ledger_info(self.storage.clone()) {
Ok(latest_synced_ledger_info) => {
(latest_synced_version, latest_synced_ledger_info)
}
Err(error) => {
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to fetch latest synced ledger info!"));
return;
}
}
}
Err(error) => {
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to fetch latest synced version!"));
return;
}
};

// Handle the commit notification
if let Err(error) = CommitNotification::handle_transaction_notification(
committed_transactions.events,
committed_transactions.transactions,
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await
{
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to handle a transaction commit notification!"));
}
committed_accounts.all_accounts_synced,
committed_accounts.last_committed_account_index,
))
);
self.handle_committed_accounts(committed_accounts).await;
}

/// Handles a notification sent by the storage synchronizer for committed accounts
Expand All @@ -444,8 +382,13 @@ impl<
.expect("Committed transaction should exist for last committed account chunk!");

// Handle the commit notification
self.handle_committed_transactions(committed_transactions)
.await;
utils::handle_committed_transactions(
committed_transactions,
self.storage.clone(),
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
};
use aptos_config::config::NodeConfig;
use aptos_data_client::aptosnet::AptosNetDataClient;
use aptos_infallible::Mutex;
use aptos_types::waypoint::Waypoint;
use consensus_notifications::ConsensusNotificationListener;
use data_streaming_service::streaming_client::StreamingServiceClient;
Expand Down Expand Up @@ -72,11 +73,14 @@ impl DriverFactory {
};

// Create the storage synchronizer
let event_subscription_service = Arc::new(Mutex::new(event_subscription_service));
let (storage_synchronizer, _, _) = StorageSynchronizer::new(
node_config.state_sync.state_sync_driver,
chunk_executor,
commit_notification_sender,
error_notification_sender,
event_subscription_service.clone(),
mempool_notification_handler.clone(),
storage.clone(),
driver_runtime.as_ref(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const MEMPOOL_COMMIT_ACK_TIMEOUT_MS: u64 = 5000; // 5 seconds
#[derive(Clone, Debug)]
pub enum CommitNotification {
CommittedAccounts(CommittedAccounts),
CommittedTransactions(CommittedTransactions),
}

/// A commit notification for new account states
Expand Down Expand Up @@ -69,17 +68,6 @@ impl CommitNotification {
CommitNotification::CommittedAccounts(committed_accounts)
}

pub fn new_committed_transactions(
events: Vec<ContractEvent>,
transactions: Vec<Transaction>,
) -> Self {
let committed_transactions = CommittedTransactions {
events,
transactions,
};
CommitNotification::CommittedTransactions(committed_transactions)
}

/// Handles the commit notification by notifying mempool and the event
/// subscription service.
pub async fn handle_transaction_notification<M: MempoolNotificationSender>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use crate::{
error::Error,
logging::{LogEntry, LogSchema},
metrics,
notification_handlers::{CommitNotification, CommittedTransactions, ErrorNotification},
notification_handlers::{
CommitNotification, CommittedTransactions, ErrorNotification, MempoolNotificationHandler,
},
utils,
};
use aptos_config::config::StateSyncDriverConfig;
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
Expand All @@ -18,16 +21,18 @@ use aptos_types::{
},
};
use data_streaming_service::data_notification::NotificationId;
use event_notifications::EventSubscriptionService;
use executor_types::ChunkExecutorTrait;
use futures::{channel::mpsc, SinkExt, StreamExt};
use mempool_notifications::MempoolNotificationSender;
use std::{
future::Future,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use storage_interface::DbReaderWriter;
use storage_interface::{DbReader, DbReaderWriter};
use tokio::{
runtime::{Handle, Runtime},
task::{yield_now, JoinHandle},
Expand Down Expand Up @@ -91,7 +96,7 @@ pub struct StorageSynchronizer<ChunkExecutor> {
// The executor for transaction and transaction output chunks
chunk_executor: Arc<ChunkExecutor>,

// A channel through which to notify the driver of committed data
// A channel through which to notify the driver of committed account data
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,

// The configuration of the state sync driver
Expand Down Expand Up @@ -136,11 +141,13 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> Clone for StorageSynchronizer<

impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecutor> {
/// Returns a new storage synchronizer alongside the executor and committer handles
pub fn new(
pub fn new<MempoolNotifier: MempoolNotificationSender>(
driver_config: StateSyncDriverConfig,
chunk_executor: Arc<ChunkExecutor>,
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
storage: DbReaderWriter,
runtime: Option<&Runtime>,
) -> (Self, JoinHandle<()>, JoinHandle<()>) {
Expand Down Expand Up @@ -170,10 +177,12 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecu
let committer_handle = spawn_committer(
chunk_executor.clone(),
committer_listener,
commit_notification_sender.clone(),
error_notification_sender.clone(),
event_subscription_service,
mempool_notification_handler,
pending_transaction_chunks.clone(),
runtime.clone(),
storage.reader.clone(),
);

// Initialize the metric gauges
Expand Down Expand Up @@ -409,13 +418,18 @@ fn spawn_executor<ChunkExecutor: ChunkExecutorTrait + 'static>(
}

/// Spawns a dedicated committer that commits executed (but pending) chunks
fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
fn spawn_committer<
ChunkExecutor: ChunkExecutorTrait + 'static,
MempoolNotifier: MempoolNotificationSender,
>(
chunk_executor: Arc<ChunkExecutor>,
mut committer_listener: mpsc::Receiver<NotificationId>,
mut commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
pending_transaction_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
storage: Arc<dyn DbReader>,
) -> JoinHandle<()> {
// Create a committer
let committer = async move {
Expand All @@ -425,20 +439,34 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
// Commit the executed chunk
match chunk_executor.commit_chunk() {
Ok((events, transactions)) => {
// Update the metrics
// Log the event and update the metrics
debug!(
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
"Committed a new transaction chunk! \
Transaction total: {:?}, event total: {:?}",
transactions.len(),
events.len()
))
);
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedTransactions
.get_label(),
transactions.len() as u64,
);

// Send a commit notification to the commit listener
let commit_notification = CommitNotification::new_committed_transactions(events, transactions);
if let Err(error) = commit_notification_sender.send(commit_notification).await {
let error = format!("Failed to send transaction commit notification! Error: {:?}", error);
send_storage_synchronizer_error(error_notification_sender.clone(), notification_id, error).await;
}
// Handle the committed transaction notification (e.g., notify mempool).
// We do this here due to synchronization issues with mempool and
// storage. See: https://github.com/aptos-labs/aptos-core/issues/553
let committed_transactions = CommittedTransactions {
events,
transactions
};
utils::handle_committed_transactions(committed_transactions,
storage.clone(),
mempool_notification_handler.clone(),
event_subscription_service.clone(),
).await;
}
Err(error) => {
let error = format!("Failed to commit executed chunk! Error: {:?}", error);
Expand Down
Loading

0 comments on commit 64699ca

Please sign in to comment.