Skip to content

Commit

Permalink
[sui-proxy/ cleanup handler/middleware] (MystenLabs#10100)
Browse files Browse the repository at this point in the history
Summary:

* cleanup the consumer.rs code
* add a new middleware to decode the protobufs
* improve error handling for rpc call to detect general rpc issues

Test Plan:

tested locally

## Description 

Describe the changes or additions included in this PR.

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
suiwombat authored Mar 30, 2023
1 parent 121f00f commit e76948d
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 102 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ snap = "1.1.0"
rustls = { version = "0.20.4", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.2"
prost = "0.11.8"
http-body = "0.4.5"


telemetry-subscribers.workspace = true
Expand Down
178 changes: 95 additions & 83 deletions crates/sui-proxy/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

use crate::admin::ReqwestClient;
use crate::prom_to_mimir::Mimir;
use crate::remote_write::WriteRequest;
use anyhow::Result;
use axum::body::Bytes;
use axum::http::StatusCode;
use bytes::{buf::Reader, Buf};
use bytes::buf::Reader;
use fastcrypto::ed25519::Ed25519PublicKey;
use multiaddr::Multiaddr;
use prometheus::proto;
use prometheus::proto::{self, MetricFamily};
use prost::Message;
use protobuf::CodedInputStream;
use std::io::Read;
Expand All @@ -18,11 +19,11 @@ use tracing::{debug, error};
/// NodeMetric holds metadata and a metric payload from the calling node
#[derive(Debug)]
pub struct NodeMetric {
pub name: String, // the sui node name from the blockchain
pub host: String, // the sui node name from the blockchain
pub network: String, // the sui blockchain name, mainnet
pub peer_addr: Multiaddr, // the sockaddr source address from the incoming request
pub public_key: Ed25519PublicKey, // the public key from the sui blockchain
pub data: Bytes, // raw post data from node
pub data: Vec<MetricFamily>, // decoded protobuf of prometheus data
}

/// The ProtobufDecoder will decode message delimited protobuf messages from prom_model.proto types
Expand Down Expand Up @@ -52,64 +53,109 @@ impl ProtobufDecoder {
}
}

pub async fn convert_to_remote_write(
rc: ReqwestClient,
nm: NodeMetric,
) -> (StatusCode, &'static str) {
let mut decoder = ProtobufDecoder::new(nm.data.reader());
let mut decoded = match decoder.parse::<proto::MetricFamily>() {
Ok(metrics) => metrics,
Err(error) => {
error!("unable to decode Vec<MetricFamily> from bytes provided by node; {error}");
return (
StatusCode::BAD_REQUEST,
"unable to decode Vec<MetricFamily> from bytes provided by node",
);
}
};

// populate labels in place for our given metric family data
fn populate_labels(node_metric: NodeMetric) -> Vec<MetricFamily> {
// proto::LabelPair doesn't have pub fields so we can't use
// struct literals to construct
let mut network = proto::LabelPair::default();
network.set_name("network".into());
network.set_value(nm.network);
let mut network_label = proto::LabelPair::default();
network_label.set_name("network".into());
network_label.set_value(node_metric.network);

let mut host = proto::LabelPair::default();
host.set_name("host".into());
host.set_value(nm.name);
let mut host_label = proto::LabelPair::default();
host_label.set_name("host".into());
host_label.set_value(node_metric.host);

let labels = vec![network, host];
let labels = vec![network_label, host_label];

let mut data = node_metric.data;
// add our extra labels to our incoming metric data
for mf in decoded.iter_mut() {
for mf in data.iter_mut() {
for m in mf.mut_metric() {
m.mut_label().extend(labels.clone());
}
}
data
}

fn encode_compress(request: &WriteRequest) -> Result<Vec<u8>, (StatusCode, &'static str)> {
let mut buf = Vec::new();
buf.reserve(request.encoded_len());
let Ok(()) = request.encode(&mut buf) else {
error!("unable to encode prompb to mimirpb");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unable to encode prompb to remote_write pb",
));
};

for timeseries in Mimir::from(decoded) {
let mut buf = Vec::new();
buf.reserve(timeseries.encoded_len());
let Ok(()) = timeseries.encode(&mut buf) else {
error!("unable to encode prompb to mimirpb");
return (
let mut s = snap::raw::Encoder::new();
let compressed = match s.compress_vec(&buf) {
Ok(compressed) => compressed,
Err(error) => {
error!("unable to compress to snappy block format; {error}");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unable to encode prompb to remote_write pb",
);
};
"unable to compress to snappy block format",
));
}
};
Ok(compressed)
}

let mut s = snap::raw::Encoder::new();
let compressed = match s.compress_vec(&buf) {
Ok(compressed) => compressed,
Err(error) => {
error!("unable to compress to snappy block format; {error}");
return (
async fn check_response(
request: WriteRequest,
response: reqwest::Response,
) -> Result<(), (StatusCode, &'static str)> {
match response.status() {
reqwest::StatusCode::OK => {
debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, request);
Ok(())
}
reqwest::StatusCode::BAD_REQUEST => {
error!("TRIED: {:?}", request);
let body = response
.text()
.await
.unwrap_or_else(|_| "response body cannot be decoded".into());

if body.contains("err-mimir-sample-out-of-order") {
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unable to compress to snappy block format",
);
"IGNORNING METRICS due to err-mimir-sample-out-of-order",
));
}
};
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unknown bad request error encountered in remote_push",
))
}
code => {
error!("TRIED: {:?}", request);
let body = response
.text()
.await
.unwrap_or_else(|_| "response body cannot be decoded".into());
error!("({}) ERROR: {:?}", code, body);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"unknown error encountered in remote_push",
))
}
}
}

pub async fn convert_to_remote_write(
rc: ReqwestClient,
node_metric: NodeMetric,
) -> (StatusCode, &'static str) {
let data = populate_labels(node_metric);
for request in Mimir::from(data) {
let compressed = match encode_compress(&request) {
Ok(compressed) => compressed,
Err(error) => return error,
};
let response = match rc
.client
.post(rc.settings.url.to_owned())
Expand All @@ -133,43 +179,9 @@ pub async fn convert_to_remote_write(
);
}
};

match response.status() {
reqwest::StatusCode::OK => {
debug!("({}) SUCCESS: {:?}", reqwest::StatusCode::OK, timeseries);
}
reqwest::StatusCode::BAD_REQUEST => {
error!("TRIED: {:?}", timeseries);
let body = response
.text()
.await
.unwrap_or_else(|_| "response body cannot be decoded".into());

if body.contains("err-mimir-sample-out-of-order") {
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"IGNORNING METRICS due to err-mimir-sample-out-of-order",
);
}
error!("({}) ERROR: {:?}", reqwest::StatusCode::BAD_REQUEST, body);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"unknown bad request error encountered in remote_push",
);
}
code => {
error!("TRIED: {:?}", timeseries);
let body = response
.text()
.await
.unwrap_or_else(|_| "response body cannot be decoded".into());
error!("({}) ERROR: {:?}", code, body);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"unknown error encountered in remote_push",
);
}
match check_response(request, response).await {
Ok(_) => (),
Err(error) => return error,
}
}
(StatusCode::CREATED, "created")
Expand Down
18 changes: 5 additions & 13 deletions crates/sui-proxy/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,30 @@
// SPDX-License-Identifier: Apache-2.0
use crate::admin::ReqwestClient;
use crate::consumer::{convert_to_remote_write, NodeMetric};
use crate::middleware::LenDelimProtobuf;
use crate::peers::SuiPeer;
use axum::{
body::Body,
extract::{ConnectInfo, Extension},
http::{Request, StatusCode},
http::StatusCode,
};
use multiaddr::Multiaddr;
use std::net::SocketAddr;

/// Publish handler which receives metrics from nodes. Nodes will call us at this endpoint
/// and we relay them to the upstream tsdb
///
/// An mpsc is used within this handler so that we can immediately return an accept to calling nodes.
/// Downstream processing failures may still result in metrics being dropped.
/// Clients will receive a response after successfully relaying the metrics upstream
pub async fn publish_metrics(
Extension(network): Extension<String>,
Extension(client): Extension<ReqwestClient>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Extension(peer): Extension<SuiPeer>,
request: Request<Body>,
LenDelimProtobuf(data): LenDelimProtobuf,
) -> (StatusCode, &'static str) {
let data = match hyper::body::to_bytes(request.into_body()).await {
Ok(data) => data,
Err(_e) => {
return (StatusCode::BAD_REQUEST, "unable to extract post body");
}
};

convert_to_remote_write(
client.clone(),
NodeMetric {
name: peer.name,
host: peer.name,
network,
data,
peer_addr: Multiaddr::from(addr.ip()),
Expand Down
43 changes: 40 additions & 3 deletions crates/sui-proxy/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::peers::SuiNodeProvider;
use crate::{consumer::ProtobufDecoder, peers::SuiNodeProvider};
use axum::{
extract::Extension,
async_trait,
body::Bytes,
extract::{Extension, FromRequest},
headers::ContentType,
http::{Request, StatusCode},
middleware::Next,
response::Response,
TypedHeader,
BoxError, TypedHeader,
};
use bytes::Buf;
use prometheus::proto::MetricFamily;
use std::sync::Arc;
use sui_tls::TlsConnectionInfo;
use tracing::error;
Expand Down Expand Up @@ -44,3 +48,36 @@ pub async fn expect_valid_public_key<B>(
request.extensions_mut().insert(peer);
Ok(next.run(request).await)
}

// extractor that shows how to consume the request body upfront
#[derive(Debug)]
pub struct LenDelimProtobuf(pub Vec<MetricFamily>);

#[async_trait]
impl<S, B> FromRequest<S, B> for LenDelimProtobuf
where
S: Send + Sync,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = (StatusCode, String);

async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
let body = Bytes::from_request(req, state).await.map_err(|e| {
let msg = format!("error extracting bytes; {e}");
error!(msg);
(StatusCode::BAD_REQUEST, msg)
})?;

let mut decoder = ProtobufDecoder::new(body.reader());
let decoded = decoder.parse::<MetricFamily>().map_err(|e| {
let msg = format!("unable to decode len deliminated protobufs; {e}");
error!(msg);
(StatusCode::BAD_REQUEST, msg)
})?;

// req.extensions_mut().insert(decoded);
Ok(Self(decoded))
}
}
14 changes: 11 additions & 3 deletions crates/sui-proxy/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,25 @@ impl SuiNodeProvider {
.body(request.to_string())
.send()
.await
.context("unable to perform rpc")?;
.context("unable to perform json rpc")?;

let raw = response
.bytes()
.await
.context("unable to extract body bytes from json rpc")?;

#[derive(Debug, Deserialize)]
struct ResponseBody {
result: SuiSystemStateSummary,
}

let body = match response.json::<ResponseBody>().await {
let body: ResponseBody = match serde_json::from_slice(&raw) {
Ok(b) => b,
Err(error) => {
bail!("unable to decode json: {error}")
bail!(
"unable to decode json: {error} response from json rpc: {:?}",
raw
)
}
};

Expand Down

0 comments on commit e76948d

Please sign in to comment.