Skip to content

Commit

Permalink
[State Sync] Rely on the data streaming client trait (instead of the
Browse files Browse the repository at this point in the history
struct)
  • Loading branch information
JoshLind authored and aptos-bot committed Apr 26, 2022
1 parent da01f5e commit 1a22183
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 38 deletions.
30 changes: 17 additions & 13 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use aptos_types::{
use data_streaming_service::{
data_notification::{DataNotification, DataPayload, NotificationId},
data_stream::DataStreamListener,
streaming_client::{DataStreamingClient, NotificationFeedback, StreamingServiceClient},
streaming_client::{DataStreamingClient, NotificationFeedback},
};
use futures::channel::oneshot;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
Expand Down Expand Up @@ -271,7 +271,7 @@ impl AccountStateSyncer {
}

/// A simple component that manages the bootstrapping of the node
pub struct Bootstrapper<StorageSyncer> {
pub struct Bootstrapper<StorageSyncer, StreamingClient> {
// The component used to sync account states (if downloading accounts)
account_state_syncer: AccountStateSyncer,

Expand All @@ -291,7 +291,7 @@ pub struct Bootstrapper<StorageSyncer> {
speculative_stream_state: Option<SpeculativeStreamState>,

// The client through which to stream data from the Aptos network
streaming_service_client: StreamingServiceClient,
streaming_client: StreamingClient,

// The interface to read from storage
storage: Arc<dyn DbReader>,
Expand All @@ -303,10 +303,14 @@ pub struct Bootstrapper<StorageSyncer> {
verified_epoch_states: VerifiedEpochStates,
}

impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSyncer> {
impl<
StorageSyncer: StorageSynchronizerInterface + Clone,
StreamingClient: DataStreamingClient + Clone,
> Bootstrapper<StorageSyncer, StreamingClient>
{
pub fn new(
driver_configuration: DriverConfiguration,
streaming_service_client: StreamingServiceClient,
streaming_client: StreamingClient,
storage: Arc<dyn DbReader>,
storage_synchronizer: StorageSyncer,
) -> Self {
Expand All @@ -322,7 +326,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
bootstrapped: false,
driver_configuration,
speculative_stream_state: None,
streaming_service_client,
streaming_client,
storage,
storage_synchronizer,
verified_epoch_states,
Expand Down Expand Up @@ -547,7 +551,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.transaction_output_to_sync
.is_none()
{
self.streaming_service_client
self.streaming_client
.get_all_transaction_outputs(
highest_known_ledger_version,
highest_known_ledger_version,
Expand All @@ -556,7 +560,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?
} else {
let start_account_index = Some(self.account_state_syncer.next_account_index_to_commit);
self.streaming_service_client
self.streaming_client
.get_all_accounts(highest_known_ledger_version, start_account_index)
.await?
};
Expand All @@ -581,7 +585,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.expect("No higher epoch ending version known!");
let data_stream = match self.driver_configuration.config.bootstrapping_mode {
BootstrappingMode::ApplyTransactionOutputsFromGenesis => {
self.streaming_service_client
self.streaming_client
.get_all_transaction_outputs(
next_version,
end_version,
Expand All @@ -590,7 +594,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?
}
BootstrappingMode::ExecuteTransactionsFromGenesis => {
self.streaming_service_client
self.streaming_client
.get_all_transactions(
next_version,
end_version,
Expand Down Expand Up @@ -665,7 +669,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
Error::IntegerOverflow("The next epoch end has overflown!".into())
})?;
let epoch_ending_stream = self
.streaming_service_client
.streaming_client
.get_all_epoch_ending_ledger_infos(next_epoch_end)
.await?;
self.active_data_stream = Some(epoch_ending_stream);
Expand Down Expand Up @@ -1229,7 +1233,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
self.reset_active_stream();

utils::handle_end_of_stream_or_invalid_payload(
&mut self.streaming_service_client,
&mut self.streaming_client,
data_notification,
)
.await
Expand All @@ -1244,7 +1248,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
self.reset_active_stream();

utils::terminate_stream_with_feedback(
&mut self.streaming_service_client,
&mut self.streaming_client,
notification_id,
notification_feedback,
)
Expand Down
24 changes: 14 additions & 10 deletions state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use aptos_types::{
use data_streaming_service::{
data_notification::{DataNotification, DataPayload, NotificationId},
data_stream::DataStreamListener,
streaming_client::{DataStreamingClient, NotificationFeedback, StreamingServiceClient},
streaming_client::{DataStreamingClient, NotificationFeedback},
};
use std::{sync::Arc, time::Duration};
use storage_interface::DbReader;

/// A simple component that manages the continuous syncing of the node
pub struct ContinuousSyncer<StorageSyncer> {
pub struct ContinuousSyncer<StorageSyncer, StreamingClient> {
// The currently active data stream (provided by the data streaming service)
active_data_stream: Option<DataStreamListener>,

Expand All @@ -36,7 +36,7 @@ pub struct ContinuousSyncer<StorageSyncer> {
speculative_stream_state: Option<SpeculativeStreamState>,

// The client through which to stream data from the Aptos network
streaming_service_client: StreamingServiceClient,
streaming_client: StreamingClient,

// The interface to read from storage
storage: Arc<dyn DbReader>,
Expand All @@ -45,18 +45,22 @@ pub struct ContinuousSyncer<StorageSyncer> {
storage_synchronizer: StorageSyncer,
}

impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<StorageSyncer> {
impl<
StorageSyncer: StorageSynchronizerInterface + Clone,
StreamingClient: DataStreamingClient + Clone,
> ContinuousSyncer<StorageSyncer, StreamingClient>
{
pub fn new(
driver_configuration: DriverConfiguration,
streaming_service_client: StreamingServiceClient,
streaming_client: StreamingClient,
storage: Arc<dyn DbReader>,
storage_synchronizer: StorageSyncer,
) -> Self {
Self {
active_data_stream: None,
driver_configuration,
speculative_stream_state: None,
streaming_service_client,
streaming_client,
storage,
storage_synchronizer,
}
Expand Down Expand Up @@ -109,7 +113,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
.map(|sync_request| sync_request.get_sync_target());
let active_data_stream = match self.driver_configuration.config.continuous_syncing_mode {
ContinuousSyncingMode::ApplyTransactionOutputs => {
self.streaming_service_client
self.streaming_client
.continuously_stream_transaction_outputs(
next_version,
highest_synced_epoch,
Expand All @@ -118,7 +122,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
.await?
}
ContinuousSyncingMode::ExecuteTransactions => {
self.streaming_service_client
self.streaming_client
.continuously_stream_transactions(
next_version,
highest_synced_epoch,
Expand Down Expand Up @@ -380,7 +384,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
self.reset_active_stream();

utils::handle_end_of_stream_or_invalid_payload(
&mut self.streaming_service_client,
&mut self.streaming_client,
data_notification,
)
.await
Expand All @@ -395,7 +399,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
self.reset_active_stream();

utils::terminate_stream_with_feedback(
&mut self.streaming_service_client,
&mut self.streaming_client,
notification_id,
notification_feedback,
)
Expand Down
17 changes: 9 additions & 8 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aptos_types::waypoint::Waypoint;
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification,
};
use data_streaming_service::streaming_client::{NotificationFeedback, StreamingServiceClient};
use data_streaming_service::streaming_client::{DataStreamingClient, NotificationFeedback};
use event_notifications::EventSubscriptionService;
use futures::StreamExt;
use mempool_notifications::MempoolNotificationSender;
Expand Down Expand Up @@ -60,9 +60,9 @@ impl DriverConfiguration {
}

/// The state sync driver that drives synchronization progress
pub struct StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer> {
pub struct StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer, StreamingClient> {
// The component that manages the initial bootstrapping of the node
bootstrapper: Bootstrapper<StorageSyncer>,
bootstrapper: Bootstrapper<StorageSyncer, StreamingClient>,

// The listener for client notifications
client_notification_listener: ClientNotificationListener,
Expand All @@ -74,7 +74,7 @@ pub struct StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer> {
consensus_notification_handler: ConsensusNotificationHandler,

// The component that manages the continuous syncing of the node
continuous_syncer: ContinuousSyncer<StorageSyncer>,
continuous_syncer: ContinuousSyncer<StorageSyncer, StreamingClient>,

// The client for checking the global data summary of our peers
aptos_data_client: DataClient,
Expand Down Expand Up @@ -102,7 +102,8 @@ impl<
DataClient: AptosDataClient + Send + Clone + 'static,
MempoolNotifier: MempoolNotificationSender,
StorageSyncer: StorageSynchronizerInterface + Clone,
> StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer>
StreamingClient: DataStreamingClient + Clone,
> StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer, StreamingClient>
{
pub fn new(
client_notification_listener: ClientNotificationListener,
Expand All @@ -114,18 +115,18 @@ impl<
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
storage_synchronizer: StorageSyncer,
aptos_data_client: DataClient,
streaming_service_client: StreamingServiceClient,
streaming_client: StreamingClient,
storage: Arc<dyn DbReader>,
) -> Self {
let bootstrapper = Bootstrapper::new(
driver_configuration.clone(),
streaming_service_client.clone(),
streaming_client.clone(),
storage.clone(),
storage_synchronizer.clone(),
);
let continuous_syncer = ContinuousSyncer::new(
driver_configuration.clone(),
streaming_service_client,
streaming_client,
storage.clone(),
storage_synchronizer,
);
Expand Down
16 changes: 9 additions & 7 deletions state-sync/state-sync-v2/state-sync-driver/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use aptos_types::{
use data_streaming_service::{
data_notification::{DataNotification, DataPayload, NotificationId},
data_stream::DataStreamListener,
streaming_client::{DataStreamingClient, NotificationFeedback, StreamingServiceClient},
streaming_client::{DataStreamingClient, NotificationFeedback},
};
use event_notifications::EventSubscriptionService;
use futures::StreamExt;
Expand Down Expand Up @@ -131,8 +131,8 @@ pub async fn get_data_notification(
}

/// Terminates the stream with the provided notification ID and feedback
pub async fn terminate_stream_with_feedback(
streaming_service_client: &mut StreamingServiceClient,
pub async fn terminate_stream_with_feedback<StreamingClient: DataStreamingClient + Clone>(
streaming_client: &mut StreamingClient,
notification_id: NotificationId,
notification_feedback: NotificationFeedback,
) -> Result<(), Error> {
Expand All @@ -141,16 +141,18 @@ pub async fn terminate_stream_with_feedback(
notification_feedback, notification_id
)));

streaming_service_client
streaming_client
.terminate_stream_with_feedback(notification_id, notification_feedback)
.await
.map_err(|error| error.into())
}

/// Handles the end of stream notification or an invalid payload by terminating
/// the stream appropriately.
pub async fn handle_end_of_stream_or_invalid_payload(
streaming_service_client: &mut StreamingServiceClient,
pub async fn handle_end_of_stream_or_invalid_payload<
StreamingClient: DataStreamingClient + Clone,
>(
streaming_client: &mut StreamingClient,
data_notification: DataNotification,
) -> Result<(), Error> {
// Terminate the stream with the appropriate feedback
Expand All @@ -159,7 +161,7 @@ pub async fn handle_end_of_stream_or_invalid_payload(
_ => NotificationFeedback::PayloadTypeIsIncorrect,
};
terminate_stream_with_feedback(
streaming_service_client,
streaming_client,
data_notification.notification_id,
notification_feedback,
)
Expand Down

0 comments on commit 1a22183

Please sign in to comment.