Skip to content

Commit

Permalink
[State Sync] Return the join handles for the synchronizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Apr 22, 2022
1 parent a4e8c42 commit 9365007
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
let epoch_change_proofs = self.verified_epoch_states.all_epoch_ending_ledger_infos();

// Initialize the account state synchronizer
self.storage_synchronizer.initialize_account_synchronizer(
let _ = self.storage_synchronizer.initialize_account_synchronizer(
epoch_change_proofs,
ledger_info_to_sync,
transaction_output_to_sync.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl DriverFactory {
};

// Create the storage synchronizer
let storage_synchronizer = StorageSynchronizer::new(
let (storage_synchronizer, _, _) = StorageSynchronizer::new(
node_config.state_sync.state_sync_driver,
chunk_executor,
commit_notification_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{
use storage_interface::DbReaderWriter;
use tokio::{
runtime::{Handle, Runtime},
task::yield_now,
task::{yield_now, JoinHandle},
};

/// Synchronizes the storage of the node by verifying and storing new data
Expand Down Expand Up @@ -60,7 +60,7 @@ pub trait StorageSynchronizerInterface {

/// Initializes an account synchronizer with the specified
/// `target_ledger_info` and `target_output_with_proof` at the target
/// syncing version. Also, writes all `epoch_change_proofs` to storage.
/// syncing version. Returns a join handle to the account synchronizer.
///
/// Note: this assumes that `epoch_change_proofs`, `target_ledger_info`,
/// and `target_output_with_proof` have already been verified.
Expand All @@ -69,7 +69,7 @@ pub trait StorageSynchronizerInterface {
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
) -> Result<(), Error>;
) -> Result<JoinHandle<()>, Error>;

/// Returns true iff there is storage data that is still waiting
/// to be executed/applied or committed.
Expand Down Expand Up @@ -135,14 +135,15 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> Clone for StorageSynchronizer<
}

impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecutor> {
/// Returns a new storage synchronizer alongside the executor and committer handles
pub fn new(
driver_config: StateSyncDriverConfig,
chunk_executor: Arc<ChunkExecutor>,
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
storage: DbReaderWriter,
runtime: Option<&Runtime>,
) -> Self {
) -> (Self, JoinHandle<()>, JoinHandle<()>) {
// Create a channel to notify the executor when data chunks are ready
let max_pending_data_chunks = driver_config.max_pending_data_chunks as usize;
let (executor_notifier, executor_listener) = mpsc::channel(max_pending_data_chunks);
Expand All @@ -155,7 +156,7 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecu

// Spawn the executor that executes/applies storage data chunks
let runtime = runtime.map(|runtime| runtime.handle().clone());
spawn_executor(
let executor_handle = spawn_executor(
chunk_executor.clone(),
error_notification_sender.clone(),
executor_listener,
Expand All @@ -166,7 +167,7 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecu
);

// Spawn the committer that commits executed (but pending) chunks
spawn_committer(
let committer_handle = spawn_committer(
chunk_executor.clone(),
committer_listener,
commit_notification_sender.clone(),
Expand All @@ -179,7 +180,7 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecu
utils::initialize_sync_version_gauges(storage.reader.clone())
.expect("Failed to initialize the metric gauges!");

Self {
let storage_synchronizer = Self {
chunk_executor,
commit_notification_sender,
driver_config,
Expand All @@ -189,7 +190,9 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizer<ChunkExecu
runtime,
state_snapshot_notifier: None,
storage,
}
};

(storage_synchronizer, executor_handle, committer_handle)
}

/// Notifies the executor of new data chunks
Expand Down Expand Up @@ -246,14 +249,14 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizerInterface
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
) -> Result<(), Error> {
) -> Result<JoinHandle<()>, Error> {
// Create a channel to notify the state snapshot receiver when data chunks are ready
let max_pending_data_chunks = self.driver_config.max_pending_data_chunks as usize;
let (state_snapshot_notifier, state_snapshot_listener) =
mpsc::channel(max_pending_data_chunks);

// Spawn the state snapshot receiver that commits account states
spawn_state_snapshot_receiver(
let receiver_handle = spawn_state_snapshot_receiver(
self.chunk_executor.clone(),
state_snapshot_listener,
self.commit_notification_sender.clone(),
Expand All @@ -267,7 +270,7 @@ impl<ChunkExecutor: ChunkExecutorTrait + 'static> StorageSynchronizerInterface
);
self.state_snapshot_notifier = Some(state_snapshot_notifier);

Ok(())
Ok(receiver_handle)
}

fn pending_storage_data(&self) -> bool {
Expand Down Expand Up @@ -326,7 +329,7 @@ fn spawn_executor<ChunkExecutor: ChunkExecutorTrait + 'static>(
pending_transaction_chunks: Arc<AtomicU64>,
max_pending_data_chunks: u64,
runtime: Option<Handle>,
) {
) -> JoinHandle<()> {
// Create an executor
let executor = async move {
loop {
Expand Down Expand Up @@ -402,7 +405,7 @@ fn spawn_executor<ChunkExecutor: ChunkExecutorTrait + 'static>(
};

// Spawn the executor
spawn(runtime, executor);
spawn(runtime, executor)
}

/// Spawns a dedicated committer that commits executed (but pending) chunks
Expand All @@ -413,7 +416,7 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
pending_transaction_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
) {
) -> JoinHandle<()> {
// Create a committer
let committer = async move {
loop {
Expand Down Expand Up @@ -449,7 +452,7 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
};

// Spawn the committer
spawn(runtime, committer);
spawn(runtime, committer)
}

/// Spawns a dedicated receiver that commits accounts from a state snapshot
Expand All @@ -464,7 +467,7 @@ fn spawn_state_snapshot_receiver<ChunkExecutor: ChunkExecutorTrait + 'static>(
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
runtime: Option<Handle>,
) {
) -> JoinHandle<()> {
// Create a state snapshot receiver
let receiver = async move {
// Get the target version and expected root hash
Expand Down Expand Up @@ -495,7 +498,7 @@ fn spawn_state_snapshot_receiver<ChunkExecutor: ChunkExecutorTrait + 'static>(
let all_accounts_synced = account_states_with_proof.is_last_chunk();
let last_committed_account_index = account_states_with_proof.last_index;

// Attempt to the commit the chunk
// Attempt to commit the chunk
let commit_result = state_snapshot_receiver.add_chunk(
account_states_with_proof.raw_values,
account_states_with_proof.proof.clone(),
Expand Down Expand Up @@ -566,7 +569,7 @@ fn spawn_state_snapshot_receiver<ChunkExecutor: ChunkExecutorTrait + 'static>(
};

// Spawn the receiver
spawn(runtime, receiver);
spawn(runtime, receiver)
}

/// Creates a final commit notification for the last account states chunk
Expand Down Expand Up @@ -597,11 +600,14 @@ fn create_final_commit_notification(

/// Spawns a future on a specified runtime. If no runtime is specified, uses
/// the current runtime.
fn spawn(runtime: Option<Handle>, future: impl Future<Output = ()> + Send + 'static) {
fn spawn(
runtime: Option<Handle>,
future: impl Future<Output = ()> + Send + 'static,
) -> JoinHandle<()> {
if let Some(runtime) = runtime {
runtime.spawn(future);
runtime.spawn(future)
} else {
tokio::spawn(future);
tokio::spawn(future)
}
}

Expand Down

0 comments on commit 9365007

Please sign in to comment.