Skip to content

Commit

Permalink
[State Sync] Add fallback for blocked data streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Apr 7, 2022
1 parent 133d741 commit 4db2f5d
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 25 deletions.
32 changes: 27 additions & 5 deletions state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pub struct DataStream<T> {
// If this count becomes too large, the stream is evidently blocked (i.e.,
// unable to make progress) and will automatically terminate.
request_failure_count: u64,

// Whether the data stream has encountered an error trying to send a
// notification to the listener. If so, the stream is dead and it will
// stop sending notifications. This handles when clients drop the listener.
send_failure: bool,
}

impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
Expand Down Expand Up @@ -125,6 +130,7 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
notification_id_generator,
stream_end_notification_id: None,
request_failure_count: 0,
send_failure: false,
};

Ok((data_stream, data_stream_listener))
Expand Down Expand Up @@ -261,10 +267,21 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
pending_client_response
}

fn send_data_notification(&self, data_notification: DataNotification) -> Result<(), Error> {
self.notification_sender
.push((), data_notification)
.map_err(|error| Error::UnexpectedErrorEncountered(error.to_string()))
fn send_data_notification(&mut self, data_notification: DataNotification) -> Result<(), Error> {
if let Err(error) = self.notification_sender.push((), data_notification) {
let error = Error::UnexpectedErrorEncountered(error.to_string());
warn!(
(LogSchema::new(LogEntry::StreamNotification)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&error)
.message("Failed to send data notification to listener!"))
);
self.send_failure = true;
Err(error)
} else {
Ok(())
}
}

fn send_end_of_stream_notification(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -294,8 +311,9 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
) -> Result<(), Error> {
if self.stream_engine.is_stream_complete()
|| self.request_failure_count >= self.config.max_request_retry
|| self.send_failure
{
if self.stream_end_notification_id.is_none() {
if !self.send_failure && self.stream_end_notification_id.is_none() {
self.send_end_of_stream_notification()?;
}
return Ok(()); // There's nothing left to do
Expand Down Expand Up @@ -574,6 +592,9 @@ impl<T> Drop for DataStream<T> {
#[derive(Debug)]
pub struct DataStreamListener {
notification_receiver: channel::aptos_channel::Receiver<(), DataNotification>,

/// Stores the number of consecutive timeouts encountered when listening to this stream
pub num_consecutive_timeouts: u64,
}

impl DataStreamListener {
Expand All @@ -582,6 +603,7 @@ impl DataStreamListener {
) -> Self {
Self {
notification_receiver,
num_consecutive_timeouts: 0,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,60 @@ async fn test_stream_out_of_order_responses() {
assert_none!(stream_listener.select_next_some().now_or_never());
}

#[tokio::test]
async fn test_stream_listener_dropped() {
// Create an epoch ending data stream
let streaming_service_config = DataStreamingServiceConfig::default();
let (mut data_stream, mut stream_listener) =
create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END);

// Initialize the data stream
let global_data_summary = create_global_data_summary(1);
data_stream
.initialize_data_requests(global_data_summary.clone())
.unwrap();

// Verify no notifications have been sent yet
let (sent_requests, sent_notifications) = data_stream.get_sent_requests_and_notifications();
assert_ge!(sent_requests.as_ref().unwrap().len(), 3);
assert_eq!(sent_notifications.len(), 0);

// Set a response for the first request and verify a notification is sent
set_epoch_ending_response_in_queue(&mut data_stream, 0);
data_stream
.process_data_responses(global_data_summary.clone())
.unwrap();
verify_epoch_ending_notification(
&mut stream_listener,
create_ledger_info(0, MIN_ADVERTISED_EPOCH_END, true),
)
.await;

// Verify a single notification was sent
let (_, sent_notifications) = data_stream.get_sent_requests_and_notifications();
assert_eq!(sent_notifications.len(), 1);

// Drop the listener
drop(stream_listener);

// Set a response for the first request and verify an error is returned
// when the notification is sent.
set_epoch_ending_response_in_queue(&mut data_stream, 0);
data_stream
.process_data_responses(global_data_summary.clone())
.unwrap_err();
let (_, sent_notifications) = data_stream.get_sent_requests_and_notifications();
assert_eq!(sent_notifications.len(), 2);

// Set a response for the first request and verify no notifications are sent
set_epoch_ending_response_in_queue(&mut data_stream, 0);
data_stream
.process_data_responses(global_data_summary.clone())
.unwrap();
let (_, sent_notifications) = data_stream.get_sent_requests_and_notifications();
assert_eq!(sent_notifications.len(), 2);
}

/// Creates an account stream for the given `version`.
fn create_account_stream(
streaming_service_config: DataStreamingServiceConfig,
Expand Down
21 changes: 15 additions & 6 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,25 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
}
}

/// Attempts to fetch a data notification from the active stream
async fn fetch_next_data_notification(&mut self) -> Result<DataNotification, Error> {
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let result =
utils::get_data_notification(max_stream_wait_time_ms, self.active_data_stream.as_mut())
.await;
if matches!(result, Err(Error::CriticalDataStreamTimeout(_))) {
// If the stream has timed out too many times, we need to reset it
warn!("Resetting the currently active data stream due to too many timeouts!");
self.reset_active_stream();
}
result
}

/// Processes any notifications already pending on the active stream
async fn process_active_stream_notifications(&mut self) -> Result<(), Error> {
loop {
// Fetch and process any data notifications
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let data_notification = utils::get_data_notification(
max_stream_wait_time_ms,
self.active_data_stream.as_mut(),
)
.await?;
let data_notification = self.fetch_next_data_notification().await?;
match data_notification.data_payload {
DataPayload::AccountStatesWithProof(account_states_with_proof) => {
self.process_account_states_payload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,28 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
Ok(())
}

/// Attempts to fetch a data notification from the active stream
async fn fetch_next_data_notification(&mut self) -> Result<DataNotification, Error> {
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let result =
utils::get_data_notification(max_stream_wait_time_ms, self.active_data_stream.as_mut())
.await;
if matches!(result, Err(Error::CriticalDataStreamTimeout(_))) {
// If the stream has timed out too many times, we need to reset it
warn!("Resetting the currently active data stream due to too many timeouts!");
self.reset_active_stream();
}
result
}

/// Processes any notifications already pending on the active stream
async fn process_active_stream_notifications(
&mut self,
consensus_sync_request: Arc<Mutex<Option<ConsensusSyncRequest>>>,
) -> Result<(), Error> {
loop {
// Fetch and process any data notifications
let max_stream_wait_time_ms = self.driver_configuration.config.max_stream_wait_time_ms;
let data_notification = utils::get_data_notification(
max_stream_wait_time_ms,
self.active_data_stream.as_mut(),
)
.await?;
let data_notification = self.fetch_next_data_notification().await?;
match data_notification.data_payload {
DataPayload::ContinuousTransactionOutputsWithProof(
ledger_info_with_sigs,
Expand Down
2 changes: 2 additions & 0 deletions state-sync/state-sync-v2/state-sync-driver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub enum Error {
BootstrapNotComplete(String),
#[error("Failed to send callback: {0}")]
CallbackSendFailed(String),
#[error("Timed-out waiting for a data stream too many times.")]
CriticalDataStreamTimeout(String),
#[error("Timed-out waiting for a notification from the data stream. Timeout: {0}")]
DataStreamNotificationTimeout(String),
#[error("Error encountered in the event subscription service: {0}")]
Expand Down
33 changes: 25 additions & 8 deletions state-sync/state-sync-v2/state-sync-driver/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::{sync::Arc, time::Duration};
use storage_interface::{DbReader, StartupInfo};
use tokio::time::timeout;

// TODO(joshlind): make this configurable!
// TODO(joshlind): make these configurable!
const MAX_NUM_DATA_STREAM_TIMEOUTS: u64 = 3;
pub const PENDING_DATA_LOG_FREQ_SECS: u64 = 3;

/// The speculative state that tracks a data stream of transactions or outputs.
Expand Down Expand Up @@ -85,9 +86,12 @@ impl SpeculativeStreamState {
}
}

/// Fetches a data notification from the given data stream listener. Note: this
/// helper assumes the `active_data_stream` exists and throws an error if a
/// notification is not found within the `max_stream_wait_time_ms`.
/// Fetches a data notification from the given data stream listener. Returns an
/// error if the data stream times out after `max_stream_wait_time_ms`. Also,
/// tracks the number of consecutive timeouts to identify when the stream has
/// timed out too many times.
///
/// Note: this assumes the `active_data_stream` exists.
pub async fn get_data_notification(
max_stream_wait_time_ms: u64,
active_data_stream: Option<&mut DataStreamListener>,
Expand All @@ -97,12 +101,25 @@ pub async fn get_data_notification(
let timeout_ms = Duration::from_millis(max_stream_wait_time_ms);
if let Ok(data_notification) = timeout(timeout_ms, active_data_stream.select_next_some()).await
{
// Reset the number of consecutive timeouts for the data stream
active_data_stream.num_consecutive_timeouts = 0;
Ok(data_notification)
} else {
Err(Error::DataStreamNotificationTimeout(format!(
"{:?}",
timeout_ms
)))
// Increase the number of consecutive timeouts for the data stream
active_data_stream.num_consecutive_timeouts += 1;

// Check if we've timed out too many times
if active_data_stream.num_consecutive_timeouts >= MAX_NUM_DATA_STREAM_TIMEOUTS {
Err(Error::CriticalDataStreamTimeout(format!(
"{:?}",
MAX_NUM_DATA_STREAM_TIMEOUTS
)))
} else {
Err(Error::DataStreamNotificationTimeout(format!(
"{:?}",
timeout_ms
)))
}
}
}

Expand Down

0 comments on commit 4db2f5d

Please sign in to comment.