Skip to content

Commit

Permalink
[State Sync] Add storage service notifications.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 10, 2023
1 parent 82af14e commit 05d2b72
Show file tree
Hide file tree
Showing 23 changed files with 593 additions and 87 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ members = [
"state-sync/inter-component/consensus-notifications",
"state-sync/inter-component/event-notifications",
"state-sync/inter-component/mempool-notifications",
"state-sync/inter-component/storage-service-notifications",
"state-sync/state-sync-v2/data-streaming-service",
"state-sync/state-sync-v2/state-sync-driver",
"state-sync/storage-service/client",
Expand Down Expand Up @@ -367,6 +368,7 @@ aptos-state-sync-driver = { path = "state-sync/state-sync-v2/state-sync-driver"
aptos-state-view = { path = "storage/state-view" }
aptos-storage-interface = { path = "storage/storage-interface" }
aptos-storage-service-client = { path = "state-sync/storage-service/client" }
aptos-storage-service-notifications = { path = "state-sync/inter-component/storage-service-notifications" }
aptos-storage-service-types = { path = "state-sync/storage-service/types" }
aptos-storage-service-server = { path = "state-sync/storage-service/server" }
aptos-telemetry = { path = "crates/aptos-telemetry" }
Expand Down
1 change: 1 addition & 0 deletions aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ aptos-state-sync-driver = { workspace = true }
aptos-state-view = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-storage-service-client = { workspace = true }
aptos-storage-service-notifications = { workspace = true }
aptos-storage-service-server = { workspace = true }
aptos-storage-service-types = { workspace = true }
aptos-telemetry = { workspace = true }
Expand Down
27 changes: 17 additions & 10 deletions aptos-node/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use aptos_state_sync_driver::{
};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_storage_service_client::StorageServiceClient;
use aptos_storage_service_notifications::StorageServiceNotificationListener;
use aptos_storage_service_server::{
network::StorageServiceNetworkEvents, storage::StorageReader, StorageServiceServer,
};
Expand Down Expand Up @@ -88,16 +89,8 @@ pub fn start_state_sync_and_get_notification_handles(
let network_client = storage_network_interfaces.network_client;
let network_service_events = storage_network_interfaces.network_service_events;

// Start the state sync storage service
let peers_and_metadata = network_client.get_peers_and_metadata();
let storage_service_runtime = setup_state_sync_storage_service(
node_config.state_sync.storage_service,
peers_and_metadata,
network_service_events,
&db_rw,
)?;

// Start the data client
let peers_and_metadata = network_client.get_peers_and_metadata();
let (aptos_data_client, aptos_data_client_runtime) =
setup_aptos_data_client(node_config, network_client, db_rw.reader.clone())?;

Expand All @@ -109,7 +102,7 @@ pub fn start_state_sync_and_get_notification_handles(
let chunk_executor = Arc::new(ChunkExecutor::<AptosVM>::new(db_rw.clone()));
let metadata_storage = PersistentMetadataStorage::new(&node_config.storage.dir());

// Create notification senders and listeners for mempool and consensus
// Create notification senders and listeners for mempool, consensus and the storage service
let (mempool_notifier, mempool_listener) =
aptos_mempool_notifications::new_mempool_notifier_listener_pair();
let (consensus_notifier, consensus_listener) =
Expand All @@ -119,6 +112,17 @@ pub fn start_state_sync_and_get_notification_handles(
.state_sync_driver
.commit_notification_timeout_ms,
);
let (storage_service_notifier, storage_service_listener) =
aptos_storage_service_notifications::new_storage_service_notifier_listener_pair();

// Start the state sync storage service
let storage_service_runtime = setup_state_sync_storage_service(
node_config.state_sync.storage_service,
peers_and_metadata,
network_service_events,
&db_rw,
storage_service_listener,
)?;

// Create the state sync driver factory
let state_sync = DriverFactory::create_and_spawn_driver(
Expand All @@ -128,6 +132,7 @@ pub fn start_state_sync_and_get_notification_handles(
db_rw,
chunk_executor,
mempool_notifier,
storage_service_notifier,
metadata_storage,
consensus_listener,
event_subscription_service,
Expand Down Expand Up @@ -201,6 +206,7 @@ fn setup_state_sync_storage_service(
peers_and_metadata: Arc<PeersAndMetadata>,
network_service_events: NetworkServiceEvents<StorageServiceMessage>,
db_rw: &DbReaderWriter,
storage_service_listener: StorageServiceNotificationListener,
) -> anyhow::Result<Runtime> {
// Create a new state sync storage service runtime
let storage_service_runtime = aptos_runtimes::spawn_named_runtime("stor-server".into(), None);
Expand All @@ -214,6 +220,7 @@ fn setup_state_sync_storage_service(
TimeService::real(),
peers_and_metadata,
StorageServiceNetworkEvents::new(network_service_events),
storage_service_listener,
);
storage_service_runtime.spawn(service.start());

Expand Down
4 changes: 2 additions & 2 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
cognitive-complexity-threshold = 100
# types are used for safety encoding
type-complexity-threshold = 10000
# manipulating complex states machines in consensus
too-many-arguments-threshold = 13
# The state sync driver requires a lot of wiring and channel handles
too-many-arguments-threshold = 14
# Reasonably large enum variants are okay
enum-variant-size-threshold = 1000
2 changes: 1 addition & 1 deletion config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Default for StorageServiceConfig {
max_transaction_output_chunk_size: MAX_TRANSACTION_OUTPUT_CHUNK_SIZE,
min_time_to_ignore_peers_secs: 300, // 5 minutes
request_moderator_refresh_interval_ms: 1000, // 1 second
storage_summary_refresh_interval_ms: 50,
storage_summary_refresh_interval_ms: 500,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "aptos-storage-service-notifications"
description = "The notification interface between state sync and the storage service"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
aptos-channels = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
aptos-crypto = { workspace = true }
claims = { workspace = true }
tokio = { workspace = true }
181 changes: 181 additions & 0 deletions state-sync/inter-component/storage-service-notifications/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use async_trait::async_trait;
use futures::{stream::FusedStream, Stream};
use serde::{Deserialize, Serialize};
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;

// Note: we limit the queue depth to 1 because it doesn't make sense for the storage service
// to execute for every notification (because it reads the latest version in the DB). Thus,
// if there are X pending notifications, the first one will refresh using the latest DB and
// the next X-1 will execute with an unchanged DB (thus, becoming a no-op and wasting the CPU).
const STORAGE_SERVICE_NOTIFICATION_CHANNEL_SIZE: usize = 1;

#[derive(Clone, Debug, Deserialize, Error, PartialEq, Eq, Serialize)]
pub enum Error {
#[error("Commit notification failed: {0}")]
CommitNotificationError(String),
}

/// The interface between the state sync driver and the storage service, allowing the driver
/// to notify the storage service of events (e.g., newly committed transactions).
#[async_trait]
pub trait StorageServiceNotificationSender: Send + Clone + Sync + 'static {
/// Notify the storage service of newly committed transactions
/// at the specified version.
async fn notify_new_commit(&self, highest_synced_version: u64) -> Result<(), Error>;
}

/// This method returns a (StorageServiceNotifier, StorageServiceNotificationListener) pair
/// that can be used to allow state sync and the storage service to communicate.
///
/// Note: the driver should take the notifier and the storage service should take the listener.
pub fn new_storage_service_notifier_listener_pair(
) -> (StorageServiceNotifier, StorageServiceNotificationListener) {
// Create a dedicated channel for notifications
let (notification_sender, notification_receiver) = aptos_channel::new(
QueueStyle::LIFO,
STORAGE_SERVICE_NOTIFICATION_CHANNEL_SIZE,
None,
);

// Create a notification sender and listener
let storage_service_notifier = StorageServiceNotifier::new(notification_sender);
let storage_service_listener = StorageServiceNotificationListener::new(notification_receiver);

(storage_service_notifier, storage_service_listener)
}

/// The state sync driver component responsible for notifying the storage service
#[derive(Clone, Debug)]
pub struct StorageServiceNotifier {
notification_sender: aptos_channel::Sender<(), StorageServiceCommitNotification>,
}

impl StorageServiceNotifier {
fn new(
notification_sender: aptos_channel::Sender<(), StorageServiceCommitNotification>,
) -> Self {
Self {
notification_sender,
}
}
}

#[async_trait]
impl StorageServiceNotificationSender for StorageServiceNotifier {
async fn notify_new_commit(&self, highest_synced_version: u64) -> Result<(), Error> {
// Create a new commit notification
let commit_notification = StorageServiceCommitNotification {
highest_synced_version,
};

// Send the notification to the storage service
if let Err(error) = self
.notification_sender
.clone()
.push((), commit_notification)
{
return Err(Error::CommitNotificationError(format!(
"Failed to notify the storage service of committed transactions! Error: {:?}",
error
)));
}

Ok(())
}
}

/// The storage service component responsible for handling state sync notifications
#[derive(Debug)]
pub struct StorageServiceNotificationListener {
notification_receiver: aptos_channel::Receiver<(), StorageServiceCommitNotification>,
}

impl StorageServiceNotificationListener {
fn new(
notification_receiver: aptos_channel::Receiver<(), StorageServiceCommitNotification>,
) -> Self {
StorageServiceNotificationListener {
notification_receiver,
}
}
}

impl Stream for StorageServiceNotificationListener {
type Item = StorageServiceCommitNotification;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().notification_receiver).poll_next(cx)
}
}

impl FusedStream for StorageServiceNotificationListener {
fn is_terminated(&self) -> bool {
self.notification_receiver.is_terminated()
}
}

/// A notification for newly committed transactions sent
/// by the state sync driver to the storage service.
#[derive(Debug)]
pub struct StorageServiceCommitNotification {
pub highest_synced_version: u64, // The new highest synced version
}

impl fmt::Display for StorageServiceCommitNotification {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"StorageServiceCommitNotification [highest_synced_version: {}]",
self.highest_synced_version,
)
}
}

#[cfg(test)]
mod tests {
use crate::{
new_storage_service_notifier_listener_pair, Error, StorageServiceNotificationSender,
};
use claims::assert_matches;
use futures::StreamExt;

#[tokio::test]
async fn test_storage_service_notification() {
// Create a storage service notifier and listener pair
let (storage_service_notifier, mut storage_service_listener) =
new_storage_service_notifier_listener_pair();

// Notify the storage service of a new commit
let highest_synced_version = 500;
storage_service_notifier
.notify_new_commit(highest_synced_version)
.await
.unwrap();

// Verify the storage service received the notification
let commit_notification = storage_service_listener.next().await.unwrap();
assert_eq!(
commit_notification.highest_synced_version,
highest_synced_version
);

// Drop the receiver, send a notification and verify an error is returned
drop(storage_service_listener);
let error = storage_service_notifier
.notify_new_commit(highest_synced_version)
.await
.unwrap_err();
assert_matches!(error, Error::CommitNotificationError(_));
}
}
1 change: 1 addition & 0 deletions state-sync/state-sync-v2/state-sync-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ aptos-runtimes = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-scratchpad = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-storage-service-notifications = { workspace = true }
aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
async-trait = { workspace = true }
Expand Down
Loading

0 comments on commit 05d2b72

Please sign in to comment.