Skip to content

Commit

Permalink
[Data Streaming Service] Add stream update notifier.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jan 16, 2024
1 parent a7a41d5 commit 73751cb
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Default for StateSyncDriverConfig {
continuous_syncing_mode: ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs,
enable_auto_bootstrapping: false,
fallback_to_output_syncing_secs: 180, // 3 minutes
progress_check_interval_ms: 50,
progress_check_interval_ms: 100,
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
max_num_stream_timeouts: 12,
Expand Down
77 changes: 77 additions & 0 deletions state-sync/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::{
metrics::{increment_counter, increment_counter_multiple_labels, start_timer},
stream_engine::{DataStreamEngine, StreamEngine},
streaming_client::{NotificationFeedback, StreamRequest},
streaming_service::StreamUpdateNotification,
};
use aptos_channels::aptos_channel;
use aptos_config::config::{AptosDataClientConfig, DataStreamingServiceConfig};
use aptos_data_client::{
global_summary::{AdvertisedData, GlobalDataSummary},
Expand Down Expand Up @@ -77,6 +79,10 @@ pub struct DataStream<T> {
// The engine for this data stream
stream_engine: StreamEngine,

// The stream update notifier (to notify the streaming service that
// the stream has been updated, e.g., data is now ready to be processed).
stream_update_notifier: aptos_channel::Sender<(), StreamUpdateNotification>,

// The current queue of data client requests and pending responses. When the
// request at the head of the queue completes (i.e., we receive a response),
// a data notification can be created and sent along the stream.
Expand Down Expand Up @@ -121,6 +127,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
data_stream_config: DataStreamingServiceConfig,
data_stream_id: DataStreamId,
stream_request: &StreamRequest,
stream_update_notifier: aptos_channel::Sender<(), StreamUpdateNotification>,
aptos_data_client: T,
notification_id_generator: Arc<U64IdGenerator>,
advertised_data: &AdvertisedData,
Expand All @@ -141,6 +148,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
data_stream_id,
aptos_data_client,
stream_engine,
stream_update_notifier,
sent_data_requests: None,
spawned_tasks: vec![],
notifications_to_responses: BTreeMap::new(),
Expand Down Expand Up @@ -374,10 +382,12 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {

// Send the request to the network
let join_handle = spawn_request_task(
self.data_stream_id,
data_client_request,
self.aptos_data_client.clone(),
pending_client_response.clone(),
request_timeout_ms,
self.stream_update_notifier.clone(),
);
self.spawned_tasks.push(join_handle);

Expand Down Expand Up @@ -1386,10 +1396,12 @@ fn extract_response_error(
}

fn spawn_request_task<T: AptosDataClientInterface + Send + Clone + 'static>(
data_stream_id: DataStreamId,
data_client_request: DataClientRequest,
aptos_data_client: T,
pending_response: PendingClientResponse,
request_timeout_ms: u64,
stream_update_notifier: aptos_channel::Sender<(), StreamUpdateNotification>,
) -> JoinHandle<()> {
// Update the requests sent counter
increment_counter(
Expand Down Expand Up @@ -1488,6 +1500,10 @@ fn spawn_request_task<T: AptosDataClientInterface + Send + Clone + 'static>(

// Save the response
pending_response.lock().client_response = Some(client_response);

// Send a notification via the stream update notifier
let stream_update_notification = StreamUpdateNotification::new(data_stream_id);
let _ = stream_update_notifier.push((), stream_update_notification);
})
}

Expand Down Expand Up @@ -1703,3 +1719,64 @@ async fn subscribe_to_transactions_or_outputs_with_proof<
let (context, payload) = client_response.await?.into_parts();
Ok(Response::new(context, ResponsePayload::try_from(payload)?))
}

#[cfg(test)]
mod test {
use super::*;
use crate::tests::utils::MockAptosDataClient;
use aptos_channels::message_queues::QueueStyle;
use futures::StreamExt;
use tokio::time::timeout;

#[tokio::test]
async fn completed_request_notifies_streaming_service() {
// Create a data client request
let data_client_request =
DataClientRequest::NumberOfStates(NumberOfStatesRequest { version: 0 });

// Create a mock data client
let data_client_config = AptosDataClientConfig::default();
let aptos_data_client =
MockAptosDataClient::new(data_client_config, true, false, true, true);

// Create a new pending client response
let pending_client_response = Arc::new(Mutex::new(Box::new(
data_notification::PendingClientResponse::new(data_client_request.clone()),
)));

// Create a stream update notifier and listener
let (stream_update_notifier, mut stream_update_listener) =
aptos_channel::new(QueueStyle::LIFO, 1, None);

// Verify the request is still pending (the request hasn't been sent yet)
assert!(pending_client_response.lock().client_response.is_none());

// Spawn the request task
let data_stream_id = 10101;
let join_handle = spawn_request_task(
data_stream_id,
data_client_request,
aptos_data_client,
pending_client_response.clone(),
1000,
stream_update_notifier.clone(),
);

// Wait for the request to complete
join_handle.await.unwrap();

// Verify the request was completed and we now have a response
assert!(pending_client_response.lock().client_response.is_some());

// Verify that a stream update notification is received
match timeout(Duration::from_secs(5), stream_update_listener.next()).await {
Ok(Some(stream_update_notification)) => {
assert_eq!(stream_update_notification.data_stream_id, data_stream_id);
},
result => panic!(
"Stream update notification was not received! Result: {:?}",
result
),
}
}
}
Loading

0 comments on commit 73751cb

Please sign in to comment.