Skip to content

Commit

Permalink
[data service] Include email and application name in metrics (aptos-l…
Browse files Browse the repository at this point in the history
  • Loading branch information
banool authored May 28, 2024
1 parent f00eb43 commit 61e3231
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 35 deletions.
48 changes: 42 additions & 6 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ pub static LATEST_PROCESSED_VERSION_PER_PROCESSOR: Lazy<IntGaugeVec> = Lazy::new
register_int_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_processed_version",
"Latest processed transaction version",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
Expand All @@ -25,7 +31,13 @@ pub static PROCESSED_VERSIONS_COUNT_PER_PROCESSOR: Lazy<IntCounterVec> = Lazy::n
register_int_counter_vec!(
"indexer_grpc_data_service_with_user_processed_versions",
"Number of transactions that have been processed by data service",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
Expand All @@ -45,7 +57,13 @@ pub static PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR: Lazy<GaugeVec> = Lazy::new(|
register_gauge_vec!(
"indexer_grpc_data_service_with_user_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
Expand All @@ -55,7 +73,13 @@ pub static CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_connection_count_v2",
"Count of connections that data service has established",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
Expand All @@ -65,7 +89,13 @@ pub static SHORT_CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_grpc_data_service_short_connection_by_user_processor_count",
"Count of the short connections; i.e., < 10 seconds",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
Expand All @@ -76,7 +106,13 @@ pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(
register_int_counter_vec!(
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
"Count of bytes ready to transfer to the client",
&["identifier", "processor"],
&[
"identifier_type",
"identifier",
"email",
"application_name",
"processor"
],
)
.unwrap()
});
45 changes: 17 additions & 28 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use aptos_indexer_grpc_utils::{
config::IndexerGrpcFileStoreConfig,
constants::{
IndexerGrpcRequestMetadata, GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER,
MESSAGE_SIZE_LIMIT,
MESSAGE_SIZE_LIMIT, REQUEST_HEADER_APTOS_APPLICATION_NAME, REQUEST_HEADER_APTOS_EMAIL,
REQUEST_HEADER_APTOS_IDENTIFIER, REQUEST_HEADER_APTOS_IDENTIFIER_TYPE,
},
counters::{log_grpc_step, IndexerGrpcStep, NUM_MULTI_FETCH_OVERLAPPED_VERSIONS},
file_store_operator::FileStoreOperator,
Expand Down Expand Up @@ -61,9 +62,6 @@ const RESPONSE_CHANNEL_SEND_TIMEOUT: Duration = Duration::from_secs(120);

const SHORT_CONNECTION_DURATION_IN_SECS: u64 = 10;

/// This comes from API Gateway. The identifier uniquely identifies the requester, which
/// in the case of indexer-grpc is always an application.
const REQUEST_HEADER_APTOS_IDENTIFIER: &str = "x-aptos-identifier";
const RESPONSE_HEADER_APTOS_CONNECTION_ID_HEADER: &str = "x-aptos-connection-id";
const SERVICE_TYPE: &str = "data_service";

Expand Down Expand Up @@ -139,10 +137,7 @@ impl RawData for RawDataServerWrapper {
_ => return Result::Err(Status::aborted("Invalid request token")),
};
CONNECTION_COUNT
.with_label_values(&[
&request_metadata.request_identifier,
&request_metadata.processor_name,
])
.with_label_values(&request_metadata.get_label_values())
.inc();
let request = req.into_inner();

Expand Down Expand Up @@ -510,10 +505,7 @@ async fn data_fetcher_task(
.map(|t| t.encoded_len())
.sum::<usize>();
BYTES_READY_TO_TRANSFER_FROM_SERVER
.with_label_values(&[
&request_metadata.request_identifier,
&request_metadata.processor_name,
])
.with_label_values(&request_metadata.get_label_values())
.inc_by(bytes_ready_to_transfer as u64);
// 2. Push the data to the response channel, i.e. stream the data to the client.
let current_batch_size = transaction_data.as_slice().len();
Expand All @@ -539,23 +531,14 @@ async fn data_fetcher_task(
Ok(_) => {
// TODO: Reasses whether this metric useful
LATEST_PROCESSED_VERSION_PER_PROCESSOR
.with_label_values(&[
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.with_label_values(&request_metadata.get_label_values())
.set(end_of_batch_version as i64);
PROCESSED_VERSIONS_COUNT_PER_PROCESSOR
.with_label_values(&[
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.with_label_values(&request_metadata.get_label_values())
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR
.with_label_values(&[
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.with_label_values(&request_metadata.get_label_values())
.set(data_latency_in_secs);
}
},
Expand All @@ -582,10 +565,7 @@ async fn data_fetcher_task(
if let Some(start_time) = connection_start_time {
if start_time.elapsed().as_secs() < SHORT_CONNECTION_DURATION_IN_SECS {
SHORT_CONNECTION_COUNT
.with_label_values(&[
request_metadata.request_identifier.as_str(),
request_metadata.processor_name.as_str(),
])
.with_label_values(&request_metadata.get_label_values())
.inc();
}
}
Expand Down Expand Up @@ -848,7 +828,16 @@ fn get_request_metadata(
req: &Request<GetTransactionsRequest>,
) -> tonic::Result<IndexerGrpcRequestMetadata> {
let request_metadata_pairs = vec![
(
"request_identifier_type",
REQUEST_HEADER_APTOS_IDENTIFIER_TYPE,
),
("request_identifier", REQUEST_HEADER_APTOS_IDENTIFIER),
("request_email", REQUEST_HEADER_APTOS_EMAIL),
(
"request_application_name",
REQUEST_HEADER_APTOS_APPLICATION_NAME,
),
("request_token", GRPC_AUTH_TOKEN_HEADER),
("processor_name", GRPC_REQUEST_NAME_HEADER),
];
Expand Down
37 changes: 37 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,50 @@ pub const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization";
// Limit the message size to 15MB. By default the downstream can receive up to 15MB.
pub const MESSAGE_SIZE_LIMIT: usize = 1024 * 1024 * 15;

// These come from API Gateway, see here:
// https://github.com/aptos-labs/api-gateway/blob/0aae1c17fbd0f5e9b50bdb416f62b48d3d1d5e6b/src/common.rs

/// The type of the auth identity. For example, "anonymous IP" or "application" (API
/// key). For now all data service connections must be from an application, but we
/// include this for future-proofing.
pub const REQUEST_HEADER_APTOS_IDENTIFIER_TYPE: &str = "x-aptos-identifier-type";
/// The identifier uniquely identifies the requester. For an application, this is the
/// application ID, a UUID4.
pub const REQUEST_HEADER_APTOS_IDENTIFIER: &str = "x-aptos-identifier";
/// The email of the requester. For an application, this is the email of the user who
/// created the application. When looking at metrics based on this label, you should
/// also parallelize based on the application name. Or just use the identifier.
pub const REQUEST_HEADER_APTOS_EMAIL: &str = "x-aptos-email";
/// The name of the application, e.g. something like "Graffio Testnet".
pub const REQUEST_HEADER_APTOS_APPLICATION_NAME: &str = "x-aptos-application-name";

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct IndexerGrpcRequestMetadata {
pub processor_name: String,
/// See `REQUEST_HEADER_APTOS_IDENTIFIER_TYPE` for more information.
pub request_identifier_type: String,
/// See `REQUEST_HEADER_APTOS_IDENTIFIER` for more information.
pub request_identifier: String,
/// See `REQUEST_HEADER_APTOS_EMAIL` for more information.
pub request_email: String,
/// See `REQUEST_HEADER_APTOS_APPLICATION_NAME` for more information.
pub request_application_name: String,
pub request_connection_id: String,
// Token is no longer needed behind api gateway.
#[deprecated]
pub request_token: String,
}

impl IndexerGrpcRequestMetadata {
/// Get the label values for use with metrics that use these labels. Note, the
/// order must match the order in metrics.rs.
pub fn get_label_values(&self) -> Vec<&str> {
vec![
&self.request_identifier_type,
&self.request_identifier,
&self.request_email,
&self.request_application_name,
&self.processor_name,
]
}
}
5 changes: 4 additions & 1 deletion ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,11 @@ pub fn log_grpc_step(
duration_in_secs,
size_in_bytes,
// Request metadata variables
request_identifier = &request_metadata.request_identifier,
processor_name = &request_metadata.processor_name,
request_identifier_type = &request_metadata.request_identifier_type,
request_identifier = &request_metadata.request_identifier,
request_email = &request_metadata.request_email,
request_application_name = &request_metadata.request_application_name,
connection_id = &request_metadata.request_connection_id,
service_type,
step = step.get_step(),
Expand Down

0 comments on commit 61e3231

Please sign in to comment.