Skip to content

Commit

Permalink
[State Sync] Pass a storage writer to the synchronizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 18, 2022
1 parent 4d446de commit 0c9042d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
state_sync_network_handles,
mempool_notifier,
consensus_listener,
db_rw.reader,
db_rw,
chunk_executor,
node_config,
waypoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use executor_types::ChunkExecutorTrait;
use futures::channel::mpsc;
use mempool_notifications::MempoolNotificationSender;
use std::sync::Arc;
use storage_interface::DbReader;
use storage_interface::DbReaderWriter;
use tokio::runtime::{Builder, Runtime};

/// Creates a new state sync driver and client
Expand All @@ -38,7 +38,7 @@ impl DriverFactory {
create_runtime: bool,
node_config: &NodeConfig,
waypoint: Waypoint,
storage: Arc<dyn DbReader>,
storage: DbReaderWriter,
chunk_executor: Arc<ChunkExecutor>,
mempool_notification_sender: MempoolNotifier,
consensus_listener: ConsensusNotificationListener,
Expand Down Expand Up @@ -76,6 +76,7 @@ impl DriverFactory {
chunk_executor,
commit_notification_sender,
error_notification_sender,
storage.writer,
driver_runtime.as_ref(),
);

Expand All @@ -98,7 +99,7 @@ impl DriverFactory {
storage_synchronizer,
aptos_data_client,
streaming_service_client,
storage,
storage.reader,
);

// Spawn the driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
Arc,
},
};
use storage_interface::DbWriter;
use tokio::runtime::Runtime;

// TODO(joshlind): add structured logging support!
Expand Down Expand Up @@ -72,13 +73,17 @@ pub struct StorageSynchronizer {

// The number of transaction data chunks pending execute/apply, or commit
pending_transaction_chunks: Arc<AtomicU64>,

// The writer to storage (required for account state syncing)
storage: Arc<dyn DbWriter>,
}

impl StorageSynchronizer {
pub fn new<ChunkExecutor: ChunkExecutorTrait + 'static>(
chunk_executor: Arc<ChunkExecutor>,
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
storage: Arc<dyn DbWriter>,
runtime: Option<&Runtime>,
) -> Self {
// Create a channel to notify the executor when transaction data chunks are ready
Expand Down Expand Up @@ -113,6 +118,7 @@ impl StorageSynchronizer {
Self {
executor_notifier,
pending_transaction_chunks,
storage,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn create_driver_for_tests(
false,
&node_config,
waypoint,
db_rw.reader,
db_rw,
chunk_executor,
mempool_notifier,
consensus_listener,
Expand Down
10 changes: 5 additions & 5 deletions state-sync/state-sync-v2/state-sync-multiplexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use state_sync_v1::{
network::{StateSyncEvents, StateSyncSender},
};
use std::sync::Arc;
use storage_interface::DbReader;
use storage_interface::DbReaderWriter;
use tokio::runtime::Runtime;

/// A struct for holding the various runtimes required by state sync v2.
Expand Down Expand Up @@ -68,7 +68,7 @@ impl StateSyncMultiplexer {
network: Vec<(NetworkId, StateSyncSender, StateSyncEvents)>,
mempool_notifier: MempoolNotifier,
consensus_listener: ConsensusNotificationListener,
storage: Arc<dyn DbReader>,
storage: DbReaderWriter,
chunk_executor: Arc<ChunkExecutor>,
node_config: &NodeConfig,
waypoint: Waypoint,
Expand All @@ -77,7 +77,7 @@ impl StateSyncMultiplexer {
streaming_service_client: StreamingServiceClient,
) -> Self {
// Notify subscribers of the initial on-chain config values
match (&*storage).fetch_synced_version() {
match (&*storage.reader).fetch_synced_version() {
Ok(synced_version) => {
if let Err(error) =
event_subscription_service.notify_initial_configs(synced_version)
Expand Down Expand Up @@ -119,7 +119,7 @@ impl StateSyncMultiplexer {
network,
mempool_notifier,
consensus_listener,
storage,
storage.reader,
chunk_executor,
node_config,
waypoint,
Expand Down Expand Up @@ -243,7 +243,7 @@ mod tests {
vec![],
mempool_notifier,
consensus_listener,
db_rw.reader.clone(),
db_rw.clone(),
Arc::new(ChunkExecutor::<AptosVM>::new(db_rw).unwrap()),
&node_config,
Waypoint::new_any(&LedgerInfo::new(BlockInfo::empty(), HashValue::random())),
Expand Down

0 comments on commit 0c9042d

Please sign in to comment.