Skip to content

Commit

Permalink
[State Sync] Make max_stream_wait_time_ms configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 22, 2022
1 parent 6028e67 commit f825edc
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
2 changes: 2 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub struct StateSyncDriverConfig {
pub enable_state_sync_v2: bool, // If the node should sync with state sync v2
pub continuous_syncing_mode: ContinuousSyncingMode, // The mode by which to sync after bootstrapping
pub progress_check_interval_ms: u64, // The interval (ms) at which to check state sync progress
pub max_stream_wait_time_ms: u64, // The max time (ms) to wait for a data stream notification
}

/// The default state sync driver config will be the one that gets (and keeps)
Expand All @@ -91,6 +92,7 @@ impl Default for StateSyncDriverConfig {
enable_state_sync_v2: false,
continuous_syncing_mode: ContinuousSyncingMode::ApplyTransactionOutputs,
progress_check_interval_ms: 100,
max_stream_wait_time_ms: 2000,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,12 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
async fn process_active_stream_notifications(&mut self) -> Result<(), Error> {
loop {
// Fetch and process any data notifications
let data_notification =
utils::get_data_notification(self.active_data_stream.as_mut()).await?;
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let data_notification = utils::get_data_notification(
max_stream_wait_time_ms,
self.active_data_stream.as_mut(),
)
.await?;
match data_notification.data_payload {
DataPayload::AccountStatesWithProof(account_states_with_proof) => {
self.process_account_states_payload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
) -> Result<(), Error> {
loop {
// Fetch and process any data notifications
let data_notification =
utils::get_data_notification(self.active_data_stream.as_mut()).await?;
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let data_notification = utils::get_data_notification(
max_stream_wait_time_ms,
self.active_data_stream.as_mut(),
)
.await?;
match data_notification.data_payload {
DataPayload::ContinuousTransactionOutputsWithProof(
ledger_info_with_sigs,
Expand Down
8 changes: 3 additions & 5 deletions state-sync/state-sync-v2/state-sync-driver/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ use std::{sync::Arc, time::Duration};
use storage_interface::{DbReader, StartupInfo};
use tokio::time::timeout;

// TODO(joshlind): make this configurable
const MAX_NOTIFICATION_WAIT_TIME_MS: u64 = 500;

/// The speculative state that tracks a data stream of transactions or outputs.
/// This assumes all data is valid and allows the driver to speculatively verify
/// payloads flowing along the stream without having to block on the executor or
Expand Down Expand Up @@ -84,13 +81,14 @@ impl SpeculativeStreamState {

/// Fetches a data notification from the given data stream listener. Note: this
/// helper assumes the `active_data_stream` exists and throws an error if a
/// notification is not found within the timeout.
/// notification is not found within the `max_stream_wait_time_ms`.
pub async fn get_data_notification(
max_stream_wait_time_ms: u64,
active_data_stream: Option<&mut DataStreamListener>,
) -> Result<DataNotification, Error> {
let active_data_stream = active_data_stream.expect("The active data stream should exist!");

let timeout_ms = Duration::from_millis(MAX_NOTIFICATION_WAIT_TIME_MS);
let timeout_ms = Duration::from_millis(max_stream_wait_time_ms);
if let Ok(data_notification) = timeout(timeout_ms, active_data_stream.select_next_some()).await
{
Ok(data_notification)
Expand Down

0 comments on commit f825edc

Please sign in to comment.