Skip to content

Commit

Permalink
[telemetry] add metrics for node connectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
rustielin authored and aptos-bot committed May 10, 2022
1 parent 729ebdb commit 27ac627
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 60 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ futures = "0.3.12"
hex = "0.4.3"
jemallocator = { version = "0.3.2", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
rand = "0.8.3"
regex = "1.5.5"
structopt = "0.3.21"
tokio = { version = "1.8.1", features = ["full"] }
tokio-stream = "0.1.4"
Expand Down
28 changes: 14 additions & 14 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ use aptos_config::{
use aptos_data_client::aptosnet::AptosNetDataClient;
use aptos_infallible::RwLock;
use aptos_logger::{prelude::*, Logger};
use aptos_metrics::{get_public_json_metrics, get_public_metrics, metric_server};
use aptos_metrics::{get_public_json_metrics, metric_server};
use aptos_state_view::account_with_state_view::AsAccountWithStateView;
use aptos_telemetry::{
constants::{APTOS_NODE_PUSH_METRICS, CHAIN_ID_METRIC, PEER_ID_METRIC},
send_data,
constants::{
APTOS_NODE_PUSH_METRICS, CHAIN_ID_METRIC, NODE_PUSH_TIME_SECS, PEER_ID_METRIC,
SYNCED_VERSION_METRIC,
},
send_env_data,
};
use aptos_time_service::TimeService;
use aptos_types::{
Expand All @@ -40,7 +43,6 @@ use futures::channel::mpsc::channel;
use mempool_notifications::MempoolNotificationSender;
use network::application::storage::PeerMetadataStorage;
use network_builder::builder::NetworkBuilder;
use regex::Regex;
use state_sync_multiplexer::{
state_sync_v1_network_config, StateSyncMultiplexer, StateSyncRuntimes,
};
Expand Down Expand Up @@ -391,8 +393,10 @@ fn setup_state_sync_storage_service(

async fn periodic_telemetry_dump(node_config: NodeConfig, db: DbReaderWriter) {
use futures::stream::StreamExt;
let mut dump_interval =
IntervalStream::new(tokio::time::interval(std::time::Duration::from_secs(30))).fuse();
let mut dump_interval = IntervalStream::new(tokio::time::interval(
std::time::Duration::from_secs(NODE_PUSH_TIME_SECS),
))
.fuse();

info!("periodic_telemetry_dump task started");

Expand All @@ -403,13 +407,6 @@ async fn periodic_telemetry_dump(node_config: NodeConfig, db: DbReaderWriter) {
// Build the params from internal prometheus metrics
let mut metrics_params: HashMap<String, String> = HashMap::new();

// Measurement Protocol params must be underscore or alphanumeric
// Prometheus metrics use {} to represent dimensions, so rename it
let met = get_public_metrics();
let re = Regex::new(r"[\{\}=]").unwrap();
for (k, v) in &met {
metrics_params.insert(re.replace_all(k, "_").to_string(), v.to_string());
}
let met = get_public_json_metrics();
for (k, v) in &met {
metrics_params.insert(k.to_string(), v.to_string());
Expand All @@ -421,9 +418,12 @@ async fn periodic_telemetry_dump(node_config: NodeConfig, db: DbReaderWriter) {
Some(p) => p.to_string(),
None => String::new()
};
let synced_version = (&*db.reader).fetch_synced_version().unwrap_or(0);

metrics_params.insert(SYNCED_VERSION_METRIC.to_string(), synced_version.to_string());
metrics_params.insert(CHAIN_ID_METRIC.to_string(), chain_id.to_string());
metrics_params.insert(PEER_ID_METRIC.to_string(), peer_id.to_string());
send_data(APTOS_NODE_PUSH_METRICS.to_string(), peer_id.to_string(), metrics_params).await;
send_env_data(APTOS_NODE_PUSH_METRICS.to_string(), peer_id.to_string(), metrics_params).await;
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions crates/aptos-metrics/src/metric_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
gather_metrics, json_encoder::JsonEncoder, json_metrics::get_json_metrics,
public_metrics::PUBLIC_METRICS, system_metrics::refresh_system_metrics, NUM_METRICS,
gather_metrics,
json_encoder::JsonEncoder,
json_metrics::get_json_metrics,
public_metrics::{PUBLIC_JSON_METRICS, PUBLIC_METRICS},
system_metrics::refresh_system_metrics,
NUM_METRICS,
};
use futures::future;
use hyper::{
Expand Down Expand Up @@ -90,7 +94,7 @@ pub fn get_public_metrics() -> HashMap<String, String> {

pub fn get_public_json_metrics() -> HashMap<&'static str, String> {
let jmet = get_json_metrics();
whitelist_json_metrics(jmet, PUBLIC_METRICS)
whitelist_json_metrics(jmet, PUBLIC_JSON_METRICS)
}

// filtering metrics from the prometheus collections
Expand Down Expand Up @@ -167,7 +171,7 @@ async fn serve_public_metrics(req: Request<Body>) -> Result<Response<Body>, hype
}
(&Method::GET, "/json_metrics") => {
let json_metrics = get_json_metrics();
let whitelist_json_metrics = whitelist_json_metrics(json_metrics, PUBLIC_METRICS);
let whitelist_json_metrics = whitelist_json_metrics(json_metrics, PUBLIC_JSON_METRICS);
let encoded_metrics = serde_json::to_string(&whitelist_json_metrics).unwrap();
*resp.body_mut() = Body::from(encoded_metrics);
}
Expand Down
12 changes: 4 additions & 8 deletions crates/aptos-metrics/src/public_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

// A list of metrics which will be made public
pub const PUBLIC_METRICS: &[&str] = &[
// aptos metrics
"aptos_connections",
"aptos_state_sync_version",
// binary metadata
pub const PUBLIC_METRICS: &[&str] = &["aptos_connections"];

pub const PUBLIC_JSON_METRICS: &[&str] = &[
// git revision of the build
"revision",
// system info
"system_name",
"system_kernel_version",
"system_os_version",
"system_total_memory",
"system_used_memory",
"system_physical_core_count",
Expand Down
1 change: 1 addition & 0 deletions crates/aptos-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ reqwest = { version = "0.11.10", features = ["json"] }
serde = { version = "1.0.137", features = ["derive"], default-features = false }
serde_json = "1.0.81"
sysinfo = "0.23.11"
tokio = { version = "1.8.1" }
uuid = { version = "1.0.0", features = ["v4", "serde"] }

aptos-logger = { path = "../../crates/aptos-logger" }
Expand Down
19 changes: 17 additions & 2 deletions crates/aptos-telemetry/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,27 @@ pub const APTOS_GA_API_SECRET: &str = "ArtslKPTTjeiMi1n-IR39g";
pub const HTTPBIN_URL: &str = "http://httpbin.org/ip";
pub const GA4_URL: &str = "https://www.google-analytics.com/mp/collect";

// Timeouts
pub const NETWORK_PUSH_TIME_SECS: u64 = 30;
pub const NODE_PUSH_TIME_SECS: u64 = 30;

// Metrics events
pub const APTOS_NODE_PUSH_METRICS: &str = "APTOS_NODE_PUSH_METRICS";
pub const APTOS_CLI_PUSH_METRICS: &str = "APTOS_CLI_PUSH_METRICS";
pub const APTOS_NETWORK_PUSH_METRICS: &str = "APTOS_NETWORK_PUSH_METRICS";
pub const APTOS_NODE_PUSH_METRICS: &str = "APTOS_NODE_PUSH_METRICS";

// Metrics names
pub const IP_ADDR_METRIC: &str = "IP_ADDRESS";
// Environment metrics
pub const GIT_REV_METRIC: &str = "GIT_REV";
pub const IP_ADDR_METRIC: &str = "IP_ADDRESS";

// Node metrics
pub const CHAIN_ID_METRIC: &str = "CHAIN_ID";
pub const PEER_ID_METRIC: &str = "PEER_ID";
pub const SYNCED_VERSION_METRIC: &str = "SYNCED_VERSION";

// Network metrics
pub const NETWORK_ID_METRIC: &str = "NETWORK_ID";
pub const ORIGIN_METRIC: &str = "ORIGIN";
pub const PEERS_CONNECTED_METRIC: &str = "PEERS_CONNECTED";
pub const ROLE_METRIC: &str = "ROLE";
63 changes: 39 additions & 24 deletions crates/aptos-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct Ip {
origin: String,
}

pub fn is_disable() -> bool {
pub fn is_disabled() -> bool {
env::var(APTOS_TELEMETRY_DISABLE).is_ok()
}

Expand All @@ -54,28 +54,40 @@ async fn get_ip_origin() -> String {
}
}

pub async fn send_data(event_name: String, user_id: String, event_params: HashMap<String, String>) {
if is_disable() {
pub async fn send_env_data(
event_name: String,
user_id: String,
event_params: HashMap<String, String>,
) {
if is_disabled() {
debug!("Error sending data: disabled Aptos telemetry");
return;
}

// parse environment variables
let api_secret =
env::var(GA_API_SECRET).unwrap_or_else(|_| constants::APTOS_GA_API_SECRET.to_string());
let measurement_id = env::var(GA_MEASUREMENT_ID)
.unwrap_or_else(|_| constants::APTOS_GA_MEASUREMENT_ID.to_string());

// dump event params in a new hashmap with some default params to include
let mut new_event_params: HashMap<String, String> = event_params.clone();
// attempt to get IP address
let ip_origin = get_ip_origin().await;
new_event_params.insert(constants::IP_ADDR_METRIC.to_string(), ip_origin);
new_event_params.insert(constants::GIT_REV_METRIC.to_string(), get_git_rev());
send_data(event_name, user_id, new_event_params).await;
}

pub async fn send_data(event_name: String, user_id: String, event_params: HashMap<String, String>) {
if is_disabled() {
debug!("Error sending data: disabled Aptos telemetry");
return;
}

// parse environment variables
let api_secret =
env::var(GA_API_SECRET).unwrap_or_else(|_| constants::APTOS_GA_API_SECRET.to_string());
let measurement_id = env::var(GA_MEASUREMENT_ID)
.unwrap_or_else(|_| constants::APTOS_GA_MEASUREMENT_ID.to_string());

let metrics_event = MetricsEvent {
name: event_name,
params: new_event_params,
params: event_params,
};

let metrics_dump = MetricsDump {
Expand All @@ -89,18 +101,21 @@ pub async fn send_data(event_name: String, user_id: String, event_params: HashMa
};

let client = reqwest::Client::new();
let res = client
.post(format!(
"{}?&measurement_id={}&api_secret={}",
constants::GA4_URL,
measurement_id,
api_secret
))
.json::<MetricsDump>(&metrics_dump)
.send()
.await;
match res {
Ok(_) => debug!("Sent telemetry data {:?}", &metrics_dump),
Err(e) => debug!("{:?}", e),
}
// do not block on these requests
tokio::spawn(async move {
let res = client
.post(format!(
"{}?&measurement_id={}&api_secret={}",
constants::GA4_URL,
measurement_id,
api_secret
))
.json::<MetricsDump>(&metrics_dump)
.send()
.await;
match res {
Ok(_) => debug!("Sent telemetry data {:?}", &metrics_dump),
Err(e) => debug!("{:?}", e),
}
});
}
2 changes: 1 addition & 1 deletion crates/aptos/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn to_common_result<T: Serialize>(
"None"
};
let metrics = collect_metrics(command, !is_err, latency, error);
aptos_telemetry::send_data(
aptos_telemetry::send_env_data(
APTOS_CLI_PUSH_METRICS.to_string(),
uuid::Uuid::new_v4().to_string(),
metrics,
Expand Down
7 changes: 7 additions & 0 deletions devtools/x/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

fn main() {
// disables telemetry for all x commands by setting environment variable
println!("cargo:rustc-env=APTOS_TELEMETRY_DISABLE={}", 1);
}
1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ aptos-logger = { path = "../crates/aptos-logger" }
aptos-metrics = { path = "../crates/aptos-metrics" }
aptos-proptest-helpers = { path = "../crates/aptos-proptest-helpers", optional = true }
aptos-rate-limiter = { path = "../crates/aptos-rate-limiter"}
aptos-telemetry = { path = "../crates/aptos-telemetry" }
aptos-time-service = { path = "../crates/aptos-time-service", features = ["async"] }
aptos-types = { path = "../types" }
aptos-workspace-hack = { path = "../crates/aptos-workspace-hack" }
Expand Down
Loading

0 comments on commit 27ac627

Please sign in to comment.