Skip to content

Commit

Permalink
introduce prometheus metrics on the primary node (MystenLabs/narwhal#393
Browse files Browse the repository at this point in the history
)

This commit is introducing prometheus metrics for the primary node and a lightweight webserver to expose the metrics under the /metrics endpoint.
  • Loading branch information
akichidis authored Jun 30, 2022
1 parent 14944e5 commit 3fef15a
Show file tree
Hide file tree
Showing 29 changed files with 564 additions and 55 deletions.
2 changes: 1 addition & 1 deletion narwhal/.clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cognitive-complexity-threshold = 100
# types are used for safety encoding
type-complexity-threshold = 10000
# big constructors
too-many-arguments-threshold = 14
too-many-arguments-threshold = 15

disallowed-methods = [
# we use tracing with the log feature instead of the log crate.
Expand Down
4 changes: 4 additions & 0 deletions narwhal/Docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
- "3001" # Port to listen on messages from our worker nodes
ports:
- "8000:8000" # expose the gRPC port to be publicly accessible
- "8100:8010" # expose the port listening to metrics endpoint for prometheus metrics
volumes:
- ./validators:/validators
- ./validators/validator-0/logs:/home/logs
Expand Down Expand Up @@ -57,6 +58,7 @@ services:
- "3001" # Port to listen on messages from our worker nodes
ports:
- "8001:8000" # expose the gRPC port to be publicly accessible
- "8101:8010" # expose the port listening to metrics endpoint for prometheus metrics
volumes:
- ./validators:/validators
- ./validators/validator-1/logs:/home/logs
Expand Down Expand Up @@ -97,6 +99,7 @@ services:
- "3001" # Port to listen on messages from our worker nodes
ports:
- "8002:8000" # expose the gRPC port to be publicly accessible
- "8102:8010" # expose the port listening to metrics endpoint for prometheus metrics
volumes:
- ./validators:/validators
- ./validators/validator-2/logs:/home/logs
Expand Down Expand Up @@ -137,6 +140,7 @@ services:
- "3001" # Port to listen on messages from worker nodes
ports:
- "8003:8000" # expose the gRPC port to be publicly accessible
- "8103:8010" # expose the port listening to metrics endpoint for prometheus metrics
volumes:
- ./validators:/validators
- ./validators/validator-3/logs:/home/logs
Expand Down
5 changes: 4 additions & 1 deletion narwhal/Docker/gen.validators.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ cat > ${target}/parameters.json <<EOF
"max_concurrent_requests": 500000,
"max_header_delay": "2000ms",
"sync_retry_delay": "10_000ms",
"sync_retry_nodes": 3
"sync_retry_nodes": 3,
"prometheus_metrics": {
"socket_addr": "0.0.0.0:8010"
}
}
EOF

Expand Down
2 changes: 1 addition & 1 deletion narwhal/Docker/scripts/gen.committee.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def main():
}
}
}
out = {"authorities": temp}
out = {"authorities": temp, "epoch": 0}
print(json.dumps(out, indent=4))

if __name__ == '__main__':
Expand Down
1 change: 1 addition & 0 deletions narwhal/Docker/templates/node.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- "3001" # Port to listen on messages from our worker nodes
ports:
- "80{counter}:8000" # expose the gRPC port to be publicly accessible
- "81{counter}:8010" # expose the port listening to metrics endpoint for prometheus metrics
volumes:
- .:/validators
- ./validator-{counter}/logs:/home/logs
Expand Down
3 changes: 2 additions & 1 deletion narwhal/Docker/validators/committee.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@
}
}
}
}
},
"epoch": 0
}
5 changes: 4 additions & 1 deletion narwhal/Docker/validators/parameters.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@
"max_concurrent_requests": 500000,
"max_header_delay": "2000ms",
"sync_retry_delay": "10_000ms",
"sync_retry_nodes": 3
"sync_retry_nodes": 3,
"prometheus_metrics": {
"socket_addr": "0.0.0.0:8010"
}
}
15 changes: 12 additions & 3 deletions narwhal/benchmark/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ def local(ctx, debug=True):
"get_collections_timeout": "5_000ms",
"remove_collections_timeout": "5_000ms"
},
'max_concurrent_requests': 500_000
'max_concurrent_requests': 500_000,
'prometheus_metrics': {
"socket_addr": "127.0.0.1:0"
}
}
try:
ret = LocalBench(bench_params, node_params).run(debug)
Expand Down Expand Up @@ -81,7 +84,10 @@ def demo(ctx, debug=True):
"max_concurrent_requests": 500_000,
"max_header_delay": "2000ms", # ms
"sync_retry_delay": "10_000ms", # ms
"sync_retry_nodes": 3 # number of nodes
"sync_retry_nodes": 3, # number of nodes
'prometheus_metrics': {
"socket_addr": "127.0.0.1:0"
}
}
try:
ret = Demo(bench_params, node_params).run(debug)
Expand Down Expand Up @@ -192,7 +198,10 @@ def remote(ctx, debug=False):
"get_collections_timeout": "5_000ms",
"remove_collections_timeout": "5_000ms"
},
'max_concurrent_requests': 500_000
'max_concurrent_requests': 500_000,
'prometheus_metrics': {
"socket_addr": "127.0.0.1:0"
}
}
try:
Bench(ctx).run(bench_params, node_params, debug)
Expand Down
40 changes: 37 additions & 3 deletions narwhal/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::{BTreeMap, HashMap},
fs::{self, OpenOptions},
io::{BufWriter, Write as _},
net::SocketAddr,
ops::Deref,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -128,6 +129,24 @@ pub struct Parameters {
pub consensus_api_grpc: ConsensusAPIGrpcParameters,
/// The maximum number of concurrent requests for messages accepted from an un-trusted entity
pub max_concurrent_requests: usize,
/// Properties for the prometheus metrics
pub prometheus_metrics: PrometheusMetricsParameters,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PrometheusMetricsParameters {
/// Socket address the server should be listening to.
pub socket_addr: SocketAddr,
}

impl Default for PrometheusMetricsParameters {
fn default() -> Self {
Self {
socket_addr: format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap(),
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -201,6 +220,7 @@ impl Default for Parameters {
block_synchronizer: BlockSynchronizerParameters::default(),
consensus_api_grpc: ConsensusAPIGrpcParameters::default(),
max_concurrent_requests: 500_000,
prometheus_metrics: PrometheusMetricsParameters::default(),
}
}
}
Expand Down Expand Up @@ -264,7 +284,11 @@ impl Parameters {
info!(
"Max concurrent requests set to {}",
self.max_concurrent_requests
)
);
info!(
"Prometheus metrics server will run on {}",
self.prometheus_metrics.socket_addr
);
}
}

Expand Down Expand Up @@ -525,7 +549,10 @@ mod tests {
"get_collections_timeout": "5_000ms",
"remove_collections_timeout": "5_000ms"
},
"max_concurrent_requests": 500000
"max_concurrent_requests": 500000,
"prometheus_metrics": {
"socket_addr": "127.0.0.1:0"
}
}"#;

// AND temporary file
Expand Down Expand Up @@ -581,6 +608,10 @@ mod tests {
.as_millis(),
5_000
);
assert_eq!(
params.prometheus_metrics.socket_addr.to_string(),
"127.0.0.1:0",
);
}

#[test]
Expand Down Expand Up @@ -617,6 +648,9 @@ mod tests {
));
assert!(logs_contain("Get collections timeout set to 5000 ms"));
assert!(logs_contain("Remove collections timeout set to 5000 ms"));
assert!(logs_contain("Max concurrent requests set to 500000"))
assert!(logs_contain("Max concurrent requests set to 500000"));
assert!(logs_contain(
"Prometheus metrics server will run on 127.0.0.1"
));
}
}
2 changes: 1 addition & 1 deletion narwhal/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rand = { version = "0.7.3", optional = true }
rocksdb = { version = "0.18.0", features = ["snappy", "lz4", "zstd", "zlib"], default-features = false }
serde = { version = "1.0.137", features = ["derive"] }
serde_bytes = "0.11.6"
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "af52637e73332eba7c8844e9bcdeb5b48854eabf" }
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "16f00004fc7673f84b40615be19f907a0b3e1d5b" }
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["sync"] }
tracing = "0.1.34"
Expand Down
5 changes: 3 additions & 2 deletions narwhal/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ multiaddr = "0.14.0"
types = { path = "../types" }
worker = { path = "../worker" }

store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "af52637e73332eba7c8844e9bcdeb5b48854eabf" }
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "af52637e73332eba7c8844e9bcdeb5b48854eabf" }
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "16f00004fc7673f84b40615be19f907a0b3e1d5b" }
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "16f00004fc7673f84b40615be19f907a0b3e1d5b" }

workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion narwhal/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tonic = { version = "0.7.2", features = ["tls"] }
backoff = { version = "0.4.0", features = ["tokio"] }
multiaddr = "0.14.0"

mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "af52637e73332eba7c8844e9bcdeb5b48854eabf" }
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "16f00004fc7673f84b40615be19f907a0b3e1d5b" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
Expand Down
4 changes: 3 additions & 1 deletion narwhal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cfg-if = "1.0.0"
clap = "2.34"
futures = "0.3.21"
rand = "0.7.3"
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "af52637e73332eba7c8844e9bcdeb5b48854eabf" }
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "16f00004fc7673f84b40615be19f907a0b3e1d5b" }
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["full"] }
tokio-stream = "0.1.9"
Expand All @@ -33,6 +33,8 @@ primary = { path = "../primary" }
types = { path = "../types" }
worker = { path = "../worker" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
axum = "0.5.10"
prometheus = "0.13.1"

[dev-dependencies]
ed25519-dalek = "1.0.1"
Expand Down
8 changes: 8 additions & 0 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use consensus::{bullshark::Bullshark, dag::Dag, Consensus, SubscriberHandler};
use crypto::traits::{KeyPair, Signer, VerifyingKey};
use executor::{ExecutionState, Executor, SerializedTransaction, SubscriberResult};
use primary::{NetworkModel, PayloadToken, Primary};
use prometheus::Registry;
use std::sync::Arc;
use store::{
reopen,
Expand All @@ -22,6 +23,8 @@ use types::{
};
use worker::Worker;

pub mod metrics;

/// All the data stores of the node.
pub struct NodeStorage<PublicKey: VerifyingKey> {
pub header_store: Store<HeaderDigest, Header<PublicKey>>,
Expand Down Expand Up @@ -108,6 +111,8 @@ impl Node {
execution_state: Arc<State>,
// A channel to output transactions execution confirmations.
tx_confirmation: Sender<(SubscriberResult<Vec<u8>>, SerializedTransaction)>,
// A prometheus exporter Registry to use for the metrics
registry: &Registry,
) -> SubscriberResult<JoinHandle<()>>
where
PublicKey: VerifyingKey,
Expand Down Expand Up @@ -153,6 +158,7 @@ impl Node {
/* dag */ dag,
network_model,
tx_consensus,
registry,
);

Ok(primary_handle)
Expand Down Expand Up @@ -231,6 +237,8 @@ impl Node {
store: &NodeStorage<PublicKey>,
// The configuration parameters.
parameters: Parameters,
// The prometheus metrics Registry
_registry: &Registry,
) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();

Expand Down
27 changes: 23 additions & 4 deletions narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ use executor::{
ExecutionIndices, ExecutionState, ExecutionStateError, SerializedTransaction, SubscriberResult,
};
use futures::future::join_all;
use node::{Node, NodeStorage};
use node::{
metrics::{primary_metrics_registry, start_prometheus_server, worker_metrics_registry},
Node, NodeStorage,
};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver};
use tracing::subscriber::set_global_default;
use tracing::{info, subscriber::set_global_default};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};

#[tokio::main]
Expand Down Expand Up @@ -142,18 +145,23 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> {
let (tx_transaction_confirmation, rx_transaction_confirmation) =
channel(Node::CHANNEL_CAPACITY);

let registry;

// Check whether to run a primary, a worker, or an entire authority.
let node_handles = match matches.subcommand() {
// Spawn the primary and consensus core.
("primary", Some(sub_matches)) => {
registry = primary_metrics_registry(keypair.public().clone());

let handle = Node::spawn_primary(
keypair,
committee,
&store,
parameters,
parameters.clone(),
/* consensus */ !sub_matches.is_present("consensus-disabled"),
/* execution_state */ Arc::new(SimpleExecutionState),
tx_transaction_confirmation,
&registry,
)
.await?;
vec![handle]
Expand All @@ -167,18 +175,29 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> {
.parse::<WorkerId>()
.context("The worker id must be a positive integer")?;

registry = worker_metrics_registry(id, keypair.public().clone());

Node::spawn_workers(
/* name */
keypair.public().clone(),
vec![id],
committee,
&store,
parameters,
parameters.clone(),
&registry,
)
}
_ => unreachable!(),
};

// spin up prometheus server exporter
let prom_address = parameters.prometheus_metrics.socket_addr;
info!(
"Starting Prometheus HTTP metrics endpoint at {}",
prom_address
);
let _metrics_server_handle = start_prometheus_server(prom_address, &registry);

// Analyze the consensus' output.
analyze(rx_transaction_confirmation).await;

Expand Down
Loading

0 comments on commit 3fef15a

Please sign in to comment.