Skip to content

Commit

Permalink
[State Sync] Optimize subscription syncing locks.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 18, 2023
1 parent a52efba commit f936c02
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 175 deletions.
2 changes: 1 addition & 1 deletion config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Default for AptosDataClientConfig {
max_optimistic_fetch_lag_secs: 30, // 30 seconds
max_response_timeout_ms: 60_000, // 60 seconds
max_state_chunk_size: MAX_STATE_CHUNK_SIZE,
max_subscription_lag_secs: 60, // 60 seconds
max_subscription_lag_secs: 30, // 30 seconds
max_transaction_chunk_size: MAX_TRANSACTION_CHUNK_SIZE,
max_transaction_output_chunk_size: MAX_TRANSACTION_OUTPUT_CHUNK_SIZE,
optimistic_fetch_timeout_ms: 5000, // 5 seconds
Expand Down
13 changes: 13 additions & 0 deletions state-sync/state-sync-v2/data-streaming-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use aptos_metrics_core::{
};
use once_cell::sync::Lazy;

// Subscription stream termination labels
pub const MAX_CONSECUTIVE_REQUESTS_LABEL: &str = "max_consecutive_requests";

// Latency buckets for network latencies (i.e., the defaults only go up
// to 10 seconds, but we usually require more).
const NETWORK_LATENCY_BUCKETS: [f64; 14] = [
Expand Down Expand Up @@ -61,6 +64,16 @@ pub static TERMINATE_DATA_STREAM: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for the termination of existing subscription streams
pub static TERMINATE_SUBSCRIPTION_STREAM: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_terminate_subscription_stream",
"Counters related to the termination of existing subscription streams",
&["termination_reason"]
)
.unwrap()
});

/// Counter for stream progress check errors
pub static CHECK_STREAM_PROGRESS_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ impl ContinuousTransactionStreamEngine {
if subscription_stream_index
>= active_subscription_stream.get_max_subscription_stream_index()
{
// Terminate the stream and update the termination metrics
self.active_subscription_stream = None;
update_terminated_subscription_metrics(metrics::MAX_CONSECUTIVE_REQUESTS_LABEL);
}
}

Expand Down Expand Up @@ -816,8 +818,9 @@ impl ContinuousTransactionStreamEngine {
)));
}

// Reset the active subscription stream
// Reset the active subscription stream and update the metrics
self.active_subscription_stream = None;
update_terminated_subscription_metrics(request_error.get_label());

// Log the error based on the request type
if matches!(
Expand Down Expand Up @@ -2053,3 +2056,8 @@ fn extract_new_versions_and_target(

Ok((num_versions, target_ledger_info))
}

/// Updates the metrics with a terminated subscription event and reason
fn update_terminated_subscription_metrics(termination_reason: &str) {
metrics::increment_counter(&metrics::TERMINATE_SUBSCRIPTION_STREAM, termination_reason);
}
167 changes: 107 additions & 60 deletions state-sync/storage-service/server/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use aptos_storage_service_types::{
use aptos_time_service::TimeService;
use aptos_types::transaction::Version;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use dashmap::{mapref::entry::Entry, DashMap};
use lru::LruCache;
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

/// Storage server constants
const INVALID_REQUEST_LOG_FREQUENCY_SECS: u64 = 5; // The frequency to log invalid requests (secs)
Expand All @@ -51,7 +51,7 @@ pub struct Handler<T> {
lru_response_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
request_moderator: Arc<RequestModerator>,
storage: T,
subscriptions: Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>>,
subscriptions: Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>>,
time_service: TimeService,
}

Expand All @@ -62,7 +62,7 @@ impl<T: StorageReaderInterface> Handler<T> {
lru_response_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
request_moderator: Arc<RequestModerator>,
storage: T,
subscriptions: Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>>,
subscriptions: Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>>,
time_service: TimeService,
) -> Self {
Self {
Expand Down Expand Up @@ -258,74 +258,94 @@ impl<T: StorageReaderInterface> Handler<T> {
request: StorageServiceRequest,
response_sender: ResponseSender,
) {
// Create a new subscription request
// Create a new subscription request and get the stream ID
let subscription_request =
SubscriptionRequest::new(request.clone(), response_sender, self.time_service.clone());

// Grab the lock on the active subscriptions map
let mut subscriptions = self.subscriptions.lock();

// Get the existing stream ID and the request stream ID
let existing_stream_id =
subscriptions
.get_mut(&peer_network_id)
.map(|subscription_stream_requests| {
subscription_stream_requests.subscription_stream_id()
});
let request_stream_id = subscription_request.subscription_stream_id();

// If the stream already exists, add the request to the stream. Otherwise, create a new one.
if existing_stream_id == Some(request_stream_id) {
// Add the request to the existing stream (the stream IDs match)
if let Some(existing_stream) = subscriptions.get_mut(&peer_network_id) {
if let Err((error, subscription_request)) = existing_stream
.add_subscription_request(storage_service_config, subscription_request)
{
// Something went wrong when adding the request to the stream
sample!(
SampleRate::Duration(Duration::from_secs(INVALID_REQUEST_LOG_FREQUENCY_SECS)),
warn!(LogSchema::new(LogEntry::SubscriptionRequest)
.error(&error)
.peer_network_id(&peer_network_id)
.request(&request)
);
// Update the subscription metrics with the new request
update_new_subscription_metrics(peer_network_id);

// Get the subscription stream entry for the peer. Internally, this will
// lock the entry, to prevent other requests (for the same peer) from
// modifying the subscription stream entry.
let subscription_stream_entry = self.subscriptions.entry(peer_network_id);

// If the entry is empty, or the stream ID does not match the request ID,
// create a new subscription stream for the peer. Otherwise, add the
// request to the existing stream (the stream IDs match!).
match subscription_stream_entry {
Entry::Occupied(mut occupied_entry) => {
// If the stream has a different ID than the request, replace the stream.
// Otherwise, add the request to the existing stream.
let existing_stream_id = occupied_entry.get().subscription_stream_id();
if existing_stream_id != request_stream_id {
// Create a new subscription stream for the peer
let subscription_stream = SubscriptionStreamRequests::new(
subscription_request,
self.time_service.clone(),
);
occupied_entry.replace_entry(subscription_stream);

// Update the subscription metrics
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_FAILURE.into(),
);

// Notify the client of the failure
self.send_response(
request,
Err(StorageServiceError::InvalidRequest(error.to_string())),
subscription_request.take_response_sender(),
);
return;
update_created_stream_metrics(&peer_network_id);
} else {
// Add the request to the existing stream
if let Err((error, subscription_request)) = occupied_entry
.get_mut()
.add_subscription_request(storage_service_config, subscription_request)
{
// Handle the subscription failure
self.handle_subscription_request_failure(
peer_network_id,
request,
error,
subscription_request,
);
}
}
}
} else {
// Create a new stream (either no stream exists, or we have a new stream ID)
let subscription_stream_requests =
SubscriptionStreamRequests::new(subscription_request, self.time_service.clone());
subscriptions.insert(peer_network_id, subscription_stream_requests);
},
Entry::Vacant(vacant_entry) => {
// Create a new subscription stream for the peer
let subscription_stream = SubscriptionStreamRequests::new(
subscription_request,
self.time_service.clone(),
);
vacant_entry.insert(subscription_stream);

// Update the subscription metrics
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_NEW_STREAM.into(),
);
// Update the subscription metrics
update_created_stream_metrics(&peer_network_id);
},
}
}

/// Handles a subscription request failure by logging the error,
/// updating the subscription metrics, and notifying the client.
fn handle_subscription_request_failure(
&self,
peer_network_id: PeerNetworkId,
request: StorageServiceRequest,
error: Error,
subscription_request: SubscriptionRequest,
) {
// Something went wrong when adding the request to the stream
sample!(
SampleRate::Duration(Duration::from_secs(INVALID_REQUEST_LOG_FREQUENCY_SECS)),
warn!(LogSchema::new(LogEntry::SubscriptionRequest)
.error(&error)
.peer_network_id(&peer_network_id)
.request(&request)
);
);

// Update the subscription metrics
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_ADD.into(),
update_failed_subscription_metrics(peer_network_id);

// Notify the client of the failure
self.send_response(
request,
Err(StorageServiceError::InvalidRequest(error.to_string())),
subscription_request.take_response_sender(),
);
}

Expand Down Expand Up @@ -484,6 +504,33 @@ impl<T: StorageReaderInterface> Handler<T> {
}
}

/// Updates the subscription metrics with a created subscription stream event
fn update_created_stream_metrics(peer_network_id: &PeerNetworkId) {
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_NEW_STREAM.into(),
);
}

/// Updates the subscription metrics with a failed stream request
fn update_failed_subscription_metrics(peer_network_id: PeerNetworkId) {
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_FAILURE.into(),
);
}

/// Updates the subscription metrics with a new stream request
fn update_new_subscription_metrics(peer_network_id: PeerNetworkId) {
increment_counter(
&metrics::SUBSCRIPTION_EVENTS,
peer_network_id.network_id(),
SUBSCRIPTION_ADD.into(),
);
}

/// Logs the response sent by storage for a peer request
fn log_storage_response(
storage_request: StorageServiceRequest,
Expand Down
13 changes: 6 additions & 7 deletions state-sync/storage-service/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use handler::Handler;
use lru::LruCache;
use moderator::RequestModerator;
use optimistic_fetch::OptimisticFetchRequest;
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use std::{ops::Deref, sync::Arc, time::Duration};
use storage::StorageReaderInterface;
use thiserror::Error;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -78,9 +78,8 @@ pub struct StorageServiceServer<T> {
// A set of active optimistic fetches for peers waiting for new data
optimistic_fetches: Arc<DashMap<PeerNetworkId, OptimisticFetchRequest>>,

// TODO: Reduce lock contention on the mutex.
// A set of active subscriptions for peers waiting for new data
subscriptions: Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>>,
subscriptions: Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>>,

// A moderator for incoming peer requests
request_moderator: Arc<RequestModerator>,
Expand Down Expand Up @@ -114,7 +113,7 @@ impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
let lru_response_cache = Arc::new(Mutex::new(LruCache::new(
storage_service_config.max_lru_cache_size as usize,
)));
let subscriptions = Arc::new(Mutex::new(HashMap::new()));
let subscriptions = Arc::new(DashMap::new());
let request_moderator = Arc::new(RequestModerator::new(
aptos_data_client_config,
cached_storage_server_summary.clone(),
Expand Down Expand Up @@ -463,7 +462,7 @@ impl<T: StorageReaderInterface + Send + Sync> StorageServiceServer<T> {
/// Returns a copy of the active subscriptions for test purposes
pub(crate) fn get_subscriptions(
&self,
) -> Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>> {
) -> Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>> {
self.subscriptions.clone()
}
}
Expand All @@ -478,7 +477,7 @@ async fn handle_active_optimistic_fetches<T: StorageReaderInterface>(
lru_response_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
request_moderator: Arc<RequestModerator>,
storage: T,
subscriptions: Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>>,
subscriptions: Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>>,
time_service: TimeService,
) {
if let Err(error) = optimistic_fetch::handle_active_optimistic_fetches(
Expand Down Expand Up @@ -510,7 +509,7 @@ async fn handle_active_subscriptions<T: StorageReaderInterface>(
lru_response_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
request_moderator: Arc<RequestModerator>,
storage: T,
subscriptions: Arc<Mutex<HashMap<PeerNetworkId, SubscriptionStreamRequests>>>,
subscriptions: Arc<DashMap<PeerNetworkId, SubscriptionStreamRequests>>,
time_service: TimeService,
) {
if let Err(error) = subscription::handle_active_subscriptions(
Expand Down
Loading

0 comments on commit f936c02

Please sign in to comment.