Skip to content

Commit

Permalink
[sui-proxy/ add metrics for service] (MystenLabs#10219)
Browse files Browse the repository at this point in the history
Summary:

* we don't have many (only one) metrics for the proxy service itself
* add several metrics to track queue pressure and http related items

Test Plan:

local testing
```
# HELP uptime uptime of the node service in seconds
# TYPE uptime counter
uptime{version="0.0.2-DIRTY"} 3
# HELP consumer_operations Operations counters and status from operations performed in the consumer.
# TYPE consumer_operations counter
consumer_operations{operation="check_response",status="OK"} 8
consumer_operations{operation="encode_compress",status="success"} 8
# HELP histogram_relay_pressure Number of metric families submitted, exported, overflowed to/from the queue.
# TYPE histogram_relay_pressure counter
histogram_relay_pressure{histogram_relay="submit"} 2
histogram_relay_pressure{histogram_relay="submitted"} 586
# HELP histogram_submit_duration_seconds The submit fn latencies in seconds.
# TYPE histogram_submit_duration_seconds histogram
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.005"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.01"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.025"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.05"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.1"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.25"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="0.5"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="1"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="2.5"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="5"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="10"} 2
histogram_submit_duration_seconds_bucket{histogram_relay="submit",le="+Inf"} 2
histogram_submit_duration_seconds_sum{histogram_relay="submit"} 0.000036416
histogram_submit_duration_seconds_count{histogram_relay="submit"} 2
# HELP http_handler_duration_seconds The HTTP request latencies in seconds.
# TYPE http_handler_duration_seconds histogram
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.005"} 0
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.01"} 0
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.025"} 0
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.05"} 0
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.1"} 1
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.25"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="0.5"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="1"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="2.5"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="5"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="10"} 2
http_handler_duration_seconds_bucket{handler="publish_metrics",le="+Inf"} 2
http_handler_duration_seconds_sum{handler="publish_metrics"} 0.174797041
http_handler_duration_seconds_count{handler="publish_metrics"} 2
# HELP http_handler_hits Number of HTTP requests made.
# TYPE http_handler_hits counter
http_handler_hits{handler="publish_metrics"} 2
# HELP json_rpc_duration_seconds The json-rpc latencies in seconds.
# TYPE json_rpc_duration_seconds histogram
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.005"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.01"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.025"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.05"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.1"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.25"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="0.5"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="1"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="2.5"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="5"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="10"} 1
json_rpc_duration_seconds_bucket{rpc_method="suix_getLatestSuiSystemState",le="+Inf"} 1
json_rpc_duration_seconds_sum{rpc_method="suix_getLatestSuiSystemState"} 0
json_rpc_duration_seconds_count{rpc_method="suix_getLatestSuiSystemState"} 1
# HELP json_rpc_state Number of successful/failed requests made.
# TYPE json_rpc_state counter
json_rpc_state{rpc_method="suix_getLatestSuiSystemState",status="success"} 1
json_rpc_state{rpc_method="update_peer_count",status="success"} 4
# HELP protobuf_compression_seconds The time it takes to compress a remote_write payload in seconds.
# TYPE protobuf_compression_seconds histogram
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000001"} 0
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000002"} 0
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000004"} 0
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000008"} 2
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000016"} 2
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000032"} 6
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000064"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000128"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000256"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00000512"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00001024"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00002048"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00004096"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="0.00008192"} 8
protobuf_compression_seconds_bucket{operation="encode_compress",le="+Inf"} 8
protobuf_compression_seconds_sum{operation="encode_compress"} 0.0000018309999999999997
protobuf_compression_seconds_count{operation="encode_compress"} 8
# HELP protobuf_decode_duration_seconds The time it takes to perform various consumer operations in seconds.
# TYPE protobuf_decode_duration_seconds histogram
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0008"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0016"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0032"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0064"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0128"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0256"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.0512"} 0
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.1024"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.2048"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.4096"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="0.8192"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="1"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="1.25"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="1.5"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="1.75"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="2"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="4"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="8"} 2
protobuf_decode_duration_seconds_bucket{operation="convert_to_remote_write",le="+Inf"} 2
protobuf_decode_duration_seconds_sum{operation="convert_to_remote_write"} 0.166288916
protobuf_decode_duration_seconds_count{operation="convert_to_remote_write"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0008"} 0
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0016"} 0
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0032"} 0
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0064"} 0
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0128"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0256"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.0512"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.1024"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.2048"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.4096"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="0.8192"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="1"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="1.25"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="1.5"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="1.75"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="2"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="4"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="8"} 2
protobuf_decode_duration_seconds_bucket{operation="decode_len_delim_protobuf",le="+Inf"} 2
protobuf_decode_duration_seconds_sum{operation="decode_len_delim_protobuf"} 0.020646582
protobuf_decode_duration_seconds_count{operation="decode_len_delim_protobuf"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0008"} 0
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0016"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0032"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0064"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0128"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0256"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.0512"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.1024"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.2048"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.4096"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="0.8192"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="1"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="1.25"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="1.5"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="1.75"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="2"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="4"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="8"} 2
protobuf_decode_duration_seconds_bucket{operation="populate_labels",le="+Inf"} 2
protobuf_decode_duration_seconds_sum{operation="populate_labels"} 0.002759708
protobuf_decode_duration_seconds_count{operation="populate_labels"} 2
```
---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
suiwombat authored Mar 31, 2023
1 parent 4fd8b45 commit 829da08
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 24 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/sui-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sui-proxy"
version = "0.0.1"
version = "0.0.2"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
Expand Down Expand Up @@ -36,6 +36,7 @@ snap = "1.1.0"
rustls = { version = "0.20.4", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.2"
prost = "0.11.8"
once_cell = "1.17"
http-body = "0.4.5"


Expand Down
89 changes: 88 additions & 1 deletion crates/sui-proxy/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,48 @@ use axum::http::StatusCode;
use bytes::buf::Reader;
use fastcrypto::ed25519::Ed25519PublicKey;
use multiaddr::Multiaddr;
use once_cell::sync::Lazy;
use prometheus::proto;
use prometheus::{register_counter_vec, register_histogram_vec};
use prometheus::{CounterVec, HistogramVec};
use prost::Message;
use protobuf::CodedInputStream;
use std::io::Read;
use tracing::{debug, error};

static CONSUMER_OPS: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"consumer_operations",
"Operations counters and status from operations performed in the consumer.",
&["operation", "status"]
)
.unwrap()
});
static CONSUMER_ENCODE_COMPRESS_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"protobuf_compression_seconds",
"The time it takes to compress a remote_write payload in seconds.",
&["operation"],
vec![
1e-08, 2e-08, 4e-08, 8e-08, 1.6e-07, 3.2e-07, 6.4e-07, 1.28e-06, 2.56e-06, 5.12e-06,
1.024e-05, 2.048e-05, 4.096e-05, 8.192e-05
],
)
.unwrap()
});
static CONSUMER_OPERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"protobuf_decode_duration_seconds",
"The time it takes to perform various consumer operations in seconds.",
&["operation"],
vec![
0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0
],
)
.unwrap()
});

/// NodeMetric holds metadata and a metric payload from the calling node
#[derive(Debug)]
pub struct NodeMetric {
Expand All @@ -37,6 +73,9 @@ impl ProtobufDecoder {
}
/// parse a delimited buffer of protobufs. this is used to consume data sent from a sui-node
pub fn parse<T: protobuf::Message>(&mut self) -> Result<Vec<T>> {
let timer = CONSUMER_OPERATION_DURATION
.with_label_values(&["decode_len_delim_protobuf"])
.start_timer();
let mut result: Vec<T> = vec![];
while !self.buf.get_ref().is_empty() {
let len = {
Expand All @@ -47,6 +86,7 @@ impl ProtobufDecoder {
self.buf.read_exact(&mut buf)?;
result.push(T::parse_from_bytes(&buf)?);
}
timer.observe_duration();
Ok(result)
}
}
Expand All @@ -57,6 +97,9 @@ pub fn populate_labels(
network: String,
data: Vec<proto::MetricFamily>,
) -> Vec<proto::MetricFamily> {
let timer = CONSUMER_OPERATION_DURATION
.with_label_values(&["populate_labels"])
.start_timer();
// proto::LabelPair doesn't have pub fields so we can't use
// struct literals to construct
let mut network_label = proto::LabelPair::default();
Expand All @@ -76,13 +119,26 @@ pub fn populate_labels(
m.mut_label().extend(labels.clone());
}
}
timer.observe_duration();
data
}

fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
let observe = || {
let timer = CONSUMER_ENCODE_COMPRESS_DURATION
.with_label_values(&["encode_compress"])
.start_timer();
||{
timer.observe_duration();
}
}();
let mut buf = Vec::new();
buf.reserve(request.encoded_len());
if request.encode(&mut buf).is_err() {
observe();
CONSUMER_OPS
.with_label_values(&["encode_compress", "failed"])
.inc();
error!("unable to encode prompb to mimirpb");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -94,13 +150,21 @@ fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'sta
let compressed = match s.compress_vec(&buf) {
Ok(compressed) => compressed,
Err(error) => {
observe();
CONSUMER_OPS
.with_label_values(&["encode_compress", "failed"])
.inc();
error!("unable to compress to snappy block format; {error}");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unable to compress to snappy block format",
));
}
};
observe();
CONSUMER_OPS
.with_label_values(&["encode_compress", "success"])
.inc();
Ok(compressed)
}

Expand All @@ -110,6 +174,9 @@ async fn check_response(
) -> Result<(), (StatusCode, &'static str)> {
match response.status() {
reqwest::StatusCode::OK => {
CONSUMER_OPS
.with_label_values(&["check_response", "OK"])
.inc();
debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
Ok(())
}
Expand All @@ -123,12 +190,18 @@ async fn check_response(
// see mimir docs on this error condition. it's not actionable from the proxy
// so we drop it.
if body.contains("err-mimir-sample-out-of-order") {
CONSUMER_OPS
.with_label_values(&["check_response", "BAD_REQUEST"])
.inc();
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"IGNORNING METRICS due to err-mimir-sample-out-of-order",
));
}
CONSUMER_OPS
.with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
.inc();
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -141,6 +214,9 @@ async fn check_response(
.text()
.await
.unwrap_or_else(|_| "response body cannot be decoded".into());
CONSUMER_OPS
.with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
.inc();
error!("({}) ERROR: {:?}", code, body);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -154,6 +230,9 @@ pub async fn convert_to_remote_write(
rc: ReqwestClient,
node_metric: NodeMetric,
) -> (StatusCode, &'static str) {
let timer = CONSUMER_OPERATION_DURATION
.with_label_values(&["convert_to_remote_write"])
.start_timer();
for request in Mimir::from(node_metric.data) {
let compressed = match encode_compress(&request) {
Ok(compressed) => compressed,
Expand All @@ -175,7 +254,11 @@ pub async fn convert_to_remote_write(
{
Ok(response) => response,
Err(error) => {
CONSUMER_OPS
.with_label_values(&["check_response", "INTERNAL_SERVER_ERROR"])
.inc();
error!("DROPPING METRICS due to post error: {error}");
timer.observe_duration();
return (
StatusCode::INTERNAL_SERVER_ERROR,
"DROPPING METRICS due to post error",
Expand All @@ -184,8 +267,12 @@ pub async fn convert_to_remote_write(
};
match check_response(request, response).await {
Ok(_) => (),
Err(error) => return error,
Err(error) => {
timer.observe_duration();
return error;
}
}
}
timer.observe_duration();
(StatusCode::CREATED, "created")
}
31 changes: 29 additions & 2 deletions crates/sui-proxy/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,29 @@ use axum::{
http::StatusCode,
};
use multiaddr::Multiaddr;
use once_cell::sync::Lazy;
use prometheus::{register_counter_vec, register_histogram_vec};
use prometheus::{CounterVec, HistogramVec};
use std::net::SocketAddr;

static HANDLER_HITS: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"http_handler_hits",
"Number of HTTP requests made.",
&["handler"]
)
.unwrap()
});

static HTTP_HANDLER_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"http_handler_duration_seconds",
"The HTTP request latencies in seconds.",
&["handler"]
)
.unwrap()
});

/// Publish handler which receives metrics from nodes. Nodes will call us at this endpoint
/// and we relay them to the upstream tsdb
///
Expand All @@ -24,15 +45,21 @@ pub async fn publish_metrics(
Extension(relay): Extension<HistogramRelay>,
LenDelimProtobuf(data): LenDelimProtobuf,
) -> (StatusCode, &'static str) {
HANDLER_HITS.with_label_values(&["publish_metrics"]).inc();
let timer = HTTP_HANDLER_DURATION
.with_label_values(&["publish_metrics"])
.start_timer();
let data = populate_labels(host.name, network, data);
relay.submit(data.clone());
convert_to_remote_write(
let response = convert_to_remote_write(
client.clone(),
NodeMetric {
data,
peer_addr: Multiaddr::from(addr.ip()),
public_key: host.public_key,
},
)
.await
.await;
timer.observe_duration();
response
}
59 changes: 55 additions & 4 deletions crates/sui-proxy/src/histogram_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use anyhow::{bail, Result};
use axum::{extract::Extension, http::StatusCode, routing::get, Router};
use once_cell::sync::Lazy;
use prometheus::proto::{Metric, MetricFamily};
use prometheus::{register_counter_vec, register_histogram_vec};
use prometheus::{CounterVec, HistogramVec};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
collections::VecDeque,
Expand All @@ -14,8 +17,27 @@ use tower_http::trace::{DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
use tracing::{info, Level};

use crate::var;

const METRICS_ROUTE: &str = "/metrics";

static HISTOGRAM_RELAY_PRESSURE: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"histogram_relay_pressure",
"Number of metric families submitted, exported, overflowed to/from the queue.",
&["histogram_relay"]
)
.unwrap()
});
static HISTOGRAM_RELAY_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"histogram_submit_duration_seconds",
"The submit fn latencies in seconds.",
&["histogram_relay"]
)
.unwrap()
});

// Creates a new http server that has as a sole purpose to expose
// and endpoint that prometheus agent can use to poll for the metrics.
// A RegistryService is returned that can be used to get access in prometheus Registries.
Expand Down Expand Up @@ -71,20 +93,45 @@ impl HistogramRelay {
/// in doing so, it will also wrap each entry in a timestamp which will be use
/// for pruning old entires on each submission call. this may not be ideal long term.
pub fn submit(&self, data: Vec<MetricFamily>) {
HISTOGRAM_RELAY_PRESSURE
.with_label_values(&["submit"])
.inc();
let timer = HISTOGRAM_RELAY_DURATION
.with_label_values(&["submit"])
.start_timer();
// represents a collection timestamp
let timestamp_ms = SystemTime::now()
let timestamp_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;

let pressure = data.len();
let mut queue = self
.0
.lock()
.expect("couldn't get mut lock on HistogramRelay");
queue.retain(|v| (timestamp_ms - v.0) < 300000); // drain anything 5 mins or older
queue.push_back(Wrapper(timestamp_ms, data));
queue.retain(|v| {
// 5 mins is the max time in the queue allowed
if (timestamp_secs - v.0) < var!("MAX_QUEUE_TIME_SECS", 300) {
return true;
}
HISTOGRAM_RELAY_PRESSURE
.with_label_values(&["overflow"])
.inc();
false
}); // drain anything 5 mins or older
queue.push_back(Wrapper(timestamp_secs, data));
HISTOGRAM_RELAY_PRESSURE
.with_label_values(&["submitted"])
.inc_by(pressure as f64);
timer.observe_duration();
}
pub fn export(&self) -> Result<String> {
HISTOGRAM_RELAY_PRESSURE
.with_label_values(&["export"])
.inc();
let timer = HISTOGRAM_RELAY_DURATION
.with_label_values(&["export"])
.start_timer();
// totally drain all metrics whenever we get a scrape request from the metrics handler
let mut queue = self
.0
Expand All @@ -109,6 +156,10 @@ impl HistogramRelay {
Ok(s) => s,
Err(error) => bail!("{error}"),
};
HISTOGRAM_RELAY_PRESSURE
.with_label_values(&["exported"])
.inc_by(histograms.len() as f64);
timer.observe_duration();
Ok(string)
}
}
Expand Down
Loading

0 comments on commit 829da08

Please sign in to comment.