Skip to content

Commit

Permalink
[State Sync] Add a new variant for account state commits to commit no…
Browse files Browse the repository at this point in the history
…tifications.
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 18, 2022
1 parent 0c9042d commit f6c0b4a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 42 deletions.
14 changes: 12 additions & 2 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
driver::DriverConfiguration, error::Error, storage_synchronizer::StorageSynchronizerInterface,
utils, utils::SpeculativeStreamState,
driver::DriverConfiguration, error::Error, notification_handlers::CommittedAccounts,
storage_synchronizer::StorageSynchronizerInterface, utils, utils::SpeculativeStreamState,
};
use aptos_config::config::BootstrappingMode;
use aptos_data_client::GlobalDataSummary;
Expand Down Expand Up @@ -819,6 +819,16 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await
}

/// Handles a notification from the driver that new accounts have been
/// committed to storage.
pub fn handle_committed_accounts(
&mut self,
_committed_accounts: CommittedAccounts,
) -> Result<(), Error> {
// TODO(joshlind): implement me!
unimplemented!();
}

/// Returns the speculative stream state. Assumes that the state exists.
fn get_speculative_stream_state(&mut self) -> &mut SpeculativeStreamState {
self.speculative_stream_state
Expand Down
89 changes: 61 additions & 28 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::{
driver_client::{ClientNotificationListener, DriverNotification},
error::Error,
notification_handlers::{
CommitNotification, CommitNotificationListener, ConsensusNotificationHandler,
ErrorNotification, ErrorNotificationListener, MempoolNotificationHandler,
CommitNotification, CommitNotificationListener, CommittedAccounts, CommittedTransactions,
ConsensusNotificationHandler, ErrorNotification, ErrorNotificationListener,
MempoolNotificationHandler,
},
storage_synchronizer::StorageSynchronizerInterface,
utils,
Expand Down Expand Up @@ -249,25 +250,21 @@ impl<
consensus_commit_notification.reconfiguration_events.len()
);

// Create a commit notification
// TODO(joshlind): can we get consensus to forward the events?
let commit_notification = CommitNotification::new(
consensus_commit_notification.reconfiguration_events.clone(),
consensus_commit_notification.transactions.clone(),
);

// Handle the commit notification
let latest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
commit_notification
.handle_commit_notification(
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await?;
CommitNotification::handle_transaction_notification(
consensus_commit_notification.reconfiguration_events.clone(),
consensus_commit_notification.transactions.clone(),
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await?;

// Respond to consensus successfully
self.consensus_notification_handler
Expand Down Expand Up @@ -320,11 +317,30 @@ impl<

/// Handles a commit notification sent by the storage synchronizer
async fn handle_commit_notification(&mut self, commit_notification: CommitNotification) {
debug!("Received a commit notification from the storage synchronizer! Transaction total: {:?}, event total: {:?}",
commit_notification.transactions.len(),
commit_notification.events.len()
);
match commit_notification {
CommitNotification::CommittedAccounts(committed_accounts) => {
debug!(
"Received an account commit notification from the storage synchronizer: {:?}",
committed_accounts
);
self.handle_committed_accounts(committed_accounts).await;
}
CommitNotification::CommittedTransactions(committed_transactions) => {
debug!("Received a transaction commit notification from the storage synchronizer! Transaction total: {:?}, event total: {:?}",
committed_transactions.transactions.len(),
committed_transactions.events.len()
);
self.handle_committed_transactions(committed_transactions)
.await;
}
}
}

/// Handles a notification sent by the storage synchronizer for committed transactions
async fn handle_committed_transactions(
&mut self,
committed_transactions: CommittedTransactions,
) {
// Fetch the latest synced version and ledger info from storage
let (latest_synced_version, latest_synced_ledger_info) =
match utils::fetch_latest_synced_version(self.storage.clone()) {
Expand All @@ -349,16 +365,20 @@ impl<
};

// Handle the commit notification
if let Err(error) = commit_notification
.handle_commit_notification(
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await
if let Err(error) = CommitNotification::handle_transaction_notification(
committed_transactions.events,
committed_transactions.transactions,
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await
{
error!("Failed to handle a commit notification! Error: {:?}", error);
error!(
"Failed to handle a transaction commit notification! Error: {:?}",
error
);
}

// Update the last commit timestamp for the sync request
Expand All @@ -370,6 +390,19 @@ impl<
};
}

/// Handles a notification sent by the storage synchronizer for committed accounts
async fn handle_committed_accounts(&mut self, committed_accounts: CommittedAccounts) {
if let Err(error) = self
.bootstrapper
.handle_committed_accounts(committed_accounts)
{
error!(
"Failed to handle an account commit notification! Error: {:?}",
error
);
}
}

/// Handles an error notification sent by the storage synchronizer
async fn handle_error_notification(&mut self, error_notification: ErrorNotification) {
debug!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,55 @@ use std::{
const CONSENSUS_SYNC_REQUEST_TIMEOUT_MS: u64 = 60000; // 1 minute
const MEMPOOL_COMMIT_ACK_TIMEOUT_MS: u64 = 5000; // 5 seconds

/// A notification for new transactions and events that have been committed to
/// storage.
pub struct CommitNotification {
/// A notification for new data that has been committed to storage
#[derive(Debug)]
pub enum CommitNotification {
CommittedAccounts(CommittedAccounts),
CommittedTransactions(CommittedTransactions),
}

/// A commit notification for new account states
#[derive(Debug)]
pub struct CommittedAccounts {
pub all_accounts_synced: bool,
pub last_committed_account_index: u64,
}

/// A commit notification for new transactions
#[derive(Debug)]
pub struct CommittedTransactions {
pub events: Vec<ContractEvent>,
pub transactions: Vec<Transaction>,
}

impl CommitNotification {
pub fn new(events: Vec<ContractEvent>, transactions: Vec<Transaction>) -> Self {
Self {
pub fn new_committed_accounts(
all_accounts_synced: bool,
last_committed_account_index: u64,
) -> Self {
let committed_accounts = CommittedAccounts {
all_accounts_synced,
last_committed_account_index,
};
CommitNotification::CommittedAccounts(committed_accounts)
}

pub fn new_committed_transactions(
events: Vec<ContractEvent>,
transactions: Vec<Transaction>,
) -> Self {
let committed_transactions = CommittedTransactions {
events,
transactions,
}
};
CommitNotification::CommittedTransactions(committed_transactions)
}

/// Handles the commit notification by notifying mempool and the event
/// subscription service.
pub async fn handle_commit_notification<M: MempoolNotificationSender>(
&self,
pub async fn handle_transaction_notification<M: MempoolNotificationSender>(
events: Vec<ContractEvent>,
transactions: Vec<Transaction>,
latest_synced_version: Version,
latest_synced_ledger_info: LedgerInfoWithSignatures,
mut mempool_notification_handler: MempoolNotificationHandler<M>,
Expand All @@ -60,19 +90,18 @@ impl CommitNotification {
let blockchain_timestamp_usecs = latest_synced_ledger_info.ledger_info().timestamp_usecs();
mempool_notification_handler
.notify_mempool_of_committed_transactions(
self.transactions.clone(),
transactions.clone(),
blockchain_timestamp_usecs,
)
.await?;

// Notify the event subscription service of the events
debug!(
"Notifying the event subscription service of events at version: {:?}",
latest_synced_version
);
event_subscription_service
.lock()
.notify_events(latest_synced_version, self.events.clone())
.notify_events(latest_synced_version, events.clone())
.map_err(|error| error.into())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
// Commit the executed chunk
match chunk_executor.commit_chunk() {
Ok((events, transactions)) => {
let commit_notification = CommitNotification::new(events, transactions);
let commit_notification = CommitNotification::new_committed_transactions(events, transactions);
if let Err(error) = commit_notification_sender.send(commit_notification).await {
let error = format!("Failed to send commit notification! Error: {:?}", error);
send_storage_synchronizer_error(error_notification_sender.clone(), notification_id, error).await;
Expand Down

0 comments on commit f6c0b4a

Please sign in to comment.