Skip to content

Commit

Permalink
[Aptos Data Client] Add request/response tracking for peer buckets.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 12, 2023
1 parent 48bebaa commit 8c81fc6
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 15 deletions.
44 changes: 38 additions & 6 deletions state-sync/aptos-data-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ impl AptosDataClient {
self.storage_service_client.get_peers_and_metadata()
}

/// Updates the logs and metrics for the peer request distributions
pub fn update_peer_request_logs_and_metrics(&self) {
self.peer_states.update_peer_request_logs_and_metrics();
}

/// Update a peer's storage summary
pub fn update_peer_storage_summary(&self, peer: PeerNetworkId, summary: StorageServerSummary) {
self.peer_states.update_summary(peer, summary)
Expand Down Expand Up @@ -348,7 +353,7 @@ impl AptosDataClient {
}

/// Returns all priority and regular peers
pub(crate) fn get_priority_and_regular_peers(
pub fn get_priority_and_regular_peers(
&self,
) -> crate::error::Result<(HashSet<PeerNetworkId>, HashSet<PeerNetworkId>), Error> {
// Get all connected peers
Expand Down Expand Up @@ -452,7 +457,10 @@ impl AptosDataClient {
request: StorageServiceRequest,
request_timeout_ms: u64,
) -> crate::error::Result<Response<StorageServiceResponse>, Error> {
// Generate a unique id for the request
let id = self.response_id_generator.next();

// Update the sent request metrics
trace!(
(LogSchema::new(LogEntry::StorageServiceRequest)
.event(LogEvent::SendRequest)
Expand All @@ -461,7 +469,7 @@ impl AptosDataClient {
.peer(&peer)
.request_data(&request))
);
increment_request_counter(&metrics::SENT_REQUESTS, &request.get_label(), peer);
self.update_sent_request_metrics(peer, &request);

// Send the request and process the result
let result = self
Expand All @@ -482,7 +490,8 @@ impl AptosDataClient {
.peer(&peer))
);

increment_request_counter(&metrics::SUCCESS_RESPONSES, &request.get_label(), peer);
// Update the received response metrics
self.update_received_response_metrics(peer, &request);

// For now, record all responses that at least pass the data
// client layer successfully. An alternative might also have the
Expand Down Expand Up @@ -575,9 +584,32 @@ impl AptosDataClient {
.await
}

/// Returns a copy of the peer states for testing
#[cfg(test)]
pub(crate) fn get_peer_states(&self) -> Arc<PeerStates> {
/// Updates the metrics for the responses received via the data client
fn update_received_response_metrics(
&self,
peer: PeerNetworkId,
request: &StorageServiceRequest,
) {
// Update the global received response metrics
increment_request_counter(&metrics::SUCCESS_RESPONSES, &request.get_label(), peer);

// Update the received response counter for the specific peer
self.peer_states
.increment_received_response_counter(peer, request);
}

/// Updates the metrics for the requests sent via the data client
fn update_sent_request_metrics(&self, peer: PeerNetworkId, request: &StorageServiceRequest) {
// Increment the global request counter
increment_request_counter(&metrics::SENT_REQUESTS, &request.get_label(), peer);

// Update the sent request counter for the specific peer
self.peer_states
.increment_sent_request_counter(peer, request);
}

/// Returns the peer states
pub fn get_peer_states(&self) -> Arc<PeerStates> {
self.peer_states.clone()
}
}
Expand Down
2 changes: 1 addition & 1 deletion state-sync/aptos-data-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod interface;
mod latency_monitor;
mod logging;
mod metrics;
mod peer_states;
pub mod peer_states;
pub mod poller;
mod utils;

Expand Down
1 change: 1 addition & 0 deletions state-sync/aptos-data-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum LogEvent {
PeerIgnored,
PeerNoLongerIgnored,
PeerPollingError,
PeerRequestResponseCounts,
PeerSelectionError,
PriorityAndRegularPeers,
ResponseError,
Expand Down
27 changes: 27 additions & 0 deletions state-sync/aptos-data-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ pub static SYNC_LATENCIES: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(histogram_opts, &["label"]).unwrap()
});

/// Gauge for tracking the number of sent requests by peer buckets
pub static SENT_REQUESTS_BY_PEER_BUCKET: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"aptos_data_client_sent_requests_by_peer_bucket",
"Gauge related to the sent requests by peer buckets",
&["peer_bucket_id", "request_label"]
)
.unwrap()
});

/// Gauge for tracking the number of received responses by peer buckets
pub static RECEIVED_RESPONSES_BY_PEER_BUCKET: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"aptos_data_client_received_responses_by_peer_bucket",
"Gauge related to the received responses by peer buckets",
&["peer_bucket_id", "request_label"]
)
.unwrap()
});

/// An enum representing the various types of data that can be
/// fetched via the data client.
pub enum DataType {
Expand Down Expand Up @@ -184,6 +204,13 @@ pub fn set_gauge(counter: &Lazy<IntGaugeVec>, label: &str, value: u64) {
counter.with_label_values(&[label]).set(value as i64);
}

/// Sets the gauge with the specific label and value for the specified bucket
pub fn set_gauge_for_bucket(counter: &Lazy<IntGaugeVec>, bucket: &str, label: &str, value: u64) {
counter
.with_label_values(&[bucket, label])
.set(value as i64);
}

/// Starts the timer for the provided histogram and label values.
pub fn start_request_timer(
histogram: &Lazy<HistogramVec>,
Expand Down
Loading

0 comments on commit 8c81fc6

Please sign in to comment.