Skip to content

Commit

Permalink
feat: support different log formats
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed May 21, 2024
1 parent 5fe0754 commit 7bca77d
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 10 deletions.
132 changes: 127 additions & 5 deletions common/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Provides helper functions for initializing telemetry collection and publication.
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock, time::Duration};
use std::{convert::Infallible, net::SocketAddr, str::FromStr, sync::OnceLock, time::Duration};

use anyhow::Result;
use hyper::{
Expand All @@ -19,19 +19,70 @@ use opentelemetry_sdk::{
};
use prometheus::{Encoder, TextEncoder};
use tokio::{sync::oneshot, task::JoinHandle};
use tracing_subscriber::{filter::LevelFilter, prelude::*, EnvFilter, Registry};
use tracing_subscriber::{
filter::LevelFilter,
fmt::{
format::{Compact, DefaultFields, Json, JsonFields, Pretty},
time::SystemTime,
FormatEvent, FormatFields,
},
prelude::*,
EnvFilter, Registry,
};

#[derive(
Clone, Copy, Debug, PartialEq, Eq, Hash, Default, serde::Deserialize, serde::Serialize,
)]
/// The format to use for logging
#[serde(rename_all = "camelCase")]
pub enum LogFormat {
/// Compact format
SingleLine,
/// Pretty format
#[default]
MultiLine,
/// JSON format
Json,
}

impl FromStr for LogFormat {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self> {
match s {
"singleLine" | "single-line" | "SINGLE_LINE" => Ok(LogFormat::SingleLine),
"multiLine" | "multi-line" | "MULTI_LINE" => Ok(LogFormat::MultiLine),
"json" => Ok(LogFormat::Json),
_ => Err(anyhow::anyhow!("invalid log format: {}", s)),
}
}
}

impl std::fmt::Display for LogFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
// we match the clap enum format to make it easier when passing as an argument
LogFormat::SingleLine => write!(f, "single-line"),
LogFormat::MultiLine => write!(f, "multi-line"),
LogFormat::Json => write!(f, "json"),
}
}
}

// create a new prometheus registry
static PROM_REGISTRY: OnceLock<prometheus::Registry> = OnceLock::new();

/// Initialize tracing
pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
pub async fn init_tracing(otlp_endpoint: Option<String>, log_format: LogFormat) -> Result<()> {
//// Setup log filter
//// Default to INFO if no env is specified
let log_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?;

let fields_format = FieldsFormat::new(log_format);
let event_format = EventFormat::new(log_format);

// If we have an otlp_endpoint setup export of traces
if let Some(otlp_endpoint) = otlp_endpoint {
let tracer = opentelemetry_otlp::new_pipeline()
Expand Down Expand Up @@ -67,7 +118,8 @@ pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
// Setup logging to stdout
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.event_format(event_format)
.fmt_fields(fields_format)
.with_filter(log_filter);

let collector = Registry::default().with(telemetry).with(logger);
Expand All @@ -84,7 +136,8 @@ pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
// Setup basic log only tracing
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.event_format(event_format)
.fmt_fields(fields_format)
.with_filter(log_filter);
tracing_subscriber::registry().with(logger).init()
}
Expand Down Expand Up @@ -178,3 +231,72 @@ fn start_metrics_server(addr: &SocketAddr) -> (MetricsServerShutdown, MetricsSer
});
(tx, tokio::spawn(server))
}

// Implement a FormatEvent type that can be configured to one of a set of log formats.
struct EventFormat {
kind: LogFormat,
single: tracing_subscriber::fmt::format::Format<Compact, SystemTime>,
multi: tracing_subscriber::fmt::format::Format<Pretty, SystemTime>,
json: tracing_subscriber::fmt::format::Format<Json, SystemTime>,
}

impl EventFormat {
fn new(kind: LogFormat) -> Self {
Self {
kind,
single: tracing_subscriber::fmt::format().compact(),
multi: tracing_subscriber::fmt::format().pretty(),
json: tracing_subscriber::fmt::format().json(),
}
}
}

impl<S, N> FormatEvent<S, N> for EventFormat
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
N: for<'a> tracing_subscriber::fmt::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
writer: tracing_subscriber::fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
match self.kind {
LogFormat::SingleLine => self.single.format_event(ctx, writer, event),
LogFormat::MultiLine => self.multi.format_event(ctx, writer, event),
LogFormat::Json => self.json.format_event(ctx, writer, event),
}
}
}

// Implement a FormatFields type that can be configured to one of a set of log formats.
struct FieldsFormat {
kind: LogFormat,
default_fields: DefaultFields,
json_fields: JsonFields,
}

impl FieldsFormat {
pub fn new(kind: LogFormat) -> Self {
Self {
kind,
default_fields: DefaultFields::new(),
json_fields: JsonFields::new(),
}
}
}

impl<'writer> FormatFields<'writer> for FieldsFormat {
fn format_fields<R: tracing_subscriber::prelude::__tracing_subscriber_field_RecordFields>(
&self,
writer: tracing_subscriber::fmt::format::Writer<'writer>,
fields: R,
) -> std::fmt::Result {
match self.kind {
LogFormat::SingleLine => self.default_fields.format_fields(writer, fields),
LogFormat::MultiLine => self.default_fields.format_fields(writer, fields),
LogFormat::Json => self.json_fields.format_fields(writer, fields),
}
}
}
8 changes: 7 additions & 1 deletion keramik/src/developing_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ Edit `./k8s/operator/manifests/operator.yaml` to use `IfNotPresent` for the `ima
containers:
- name: keramik-operator
image: "keramik/operator"
imagePullPolicy: IfNotPresent
imagePullPolicy: IfNotPresent # Should be IfNotPresent when using imageTag: dev, but Always if using imageTag: latest
command:
- "/usr/bin/keramik-operator"
# you can use json logs like so
# - "--log-format"
# - "json"
- "daemon"
# ...
```

Expand Down
15 changes: 15 additions & 0 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@ pub mod utils;
/// A list of constants used in various K8s resources
#[cfg(feature = "controller")]
const CONTROLLER_NAME: &str = "keramik";

static NETWORK_LOG_FORMAT: std::sync::OnceLock<keramik_common::telemetry::LogFormat> =
std::sync::OnceLock::new();

/// Sets the log format for the network
pub fn set_network_log_format(format: keramik_common::telemetry::LogFormat) {
let _ = NETWORK_LOG_FORMAT.get_or_init(|| format);
}

/// Sets the log format for the network. Not public outside of main
pub(crate) fn network_log_format() -> keramik_common::telemetry::LogFormat {
NETWORK_LOG_FORMAT
.get_or_init(keramik_common::telemetry::LogFormat::default)
.to_owned()
}
35 changes: 33 additions & 2 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Operator is a long lived process that auotmates creating and managing Ceramic networks.
#![deny(missing_docs)]
use anyhow::Result;
use clap::{command, Parser, Subcommand};
use clap::{arg, command, Parser, Subcommand, ValueEnum};
use keramik_common::telemetry;
use keramik_operator::set_network_log_format;
use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider};
use tracing::info;

Expand All @@ -17,6 +18,30 @@ struct Cli {

#[arg(long, env = "OPERATOR_PROM_BIND", default_value = "0.0.0.0:9464")]
prom_bind: String,

#[arg(long, env = "OPERATOR_LOG_FORMAT")]
log_format: Option<LogFormat>,
}

#[derive(ValueEnum, Debug, Clone)]
/// The format of the logs
pub enum LogFormat {
/// Compact single line logs
SingleLine,
/// Pretty multi-line logs
MultiLine,
/// JSON logs
Json,
}

impl From<LogFormat> for telemetry::LogFormat {
fn from(format: LogFormat) -> telemetry::LogFormat {
match format {
LogFormat::SingleLine => telemetry::LogFormat::SingleLine,
LogFormat::MultiLine => telemetry::LogFormat::MultiLine,
LogFormat::Json => telemetry::LogFormat::Json,
}
}
}

/// Available Subcommands
Expand All @@ -29,7 +54,13 @@ pub enum Command {
#[tokio::main]
async fn main() -> Result<()> {
let args = Cli::parse();
telemetry::init_tracing(args.otlp_endpoint).await?;
let log_format = args
.log_format
.map(telemetry::LogFormat::from)
.unwrap_or_default();
set_network_log_format(log_format);
telemetry::init_tracing(args.otlp_endpoint, log_format).await?;

let (metrics_controller, metrics_server_shutdown, metrics_server_join) =
telemetry::init_metrics_prom(&args.prom_bind.parse()?).await?;
info!("starting operator");
Expand Down
10 changes: 9 additions & 1 deletion operator/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use k8s_openapi::api::{
},
};

use crate::network::{node_affinity::NodeAffinityConfig, BootstrapSpec, PEERS_CONFIG_MAP_NAME};
use crate::{
network::{node_affinity::NodeAffinityConfig, BootstrapSpec, PEERS_CONFIG_MAP_NAME},
network_log_format,
};

// BootstrapConfig defines which properties of the JobSpec can be customized.
pub struct BootstrapConfig {
Expand Down Expand Up @@ -94,6 +97,11 @@ pub fn bootstrap_job_spec(
value: Some("/keramik-peers/peers.json".to_owned()),
..Default::default()
},
EnvVar {
name: "RUNNER_LOG_FORMAT".to_owned(),
value: Some(network_log_format().to_string()),
..Default::default()
},
]),
volume_mounts: Some(vec![VolumeMount {
mount_path: "/keramik-peers".to_owned(),
Expand Down
4 changes: 4 additions & 0 deletions operator/src/network/testdata/bootstrap_job_many_peers_apply
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Request {
{
"name": "BOOTSTRAP_PEERS_PATH",
"value": "/keramik-peers/peers.json"
},
{
"name": "RUNNER_LOG_FORMAT",
"value": "multi-line"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner",
Expand Down
4 changes: 4 additions & 0 deletions operator/src/network/testdata/bootstrap_job_two_peers_apply
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Request {
{
"name": "BOOTSTRAP_PEERS_PATH",
"value": "/keramik-peers/peers.json"
},
{
"name": "RUNNER_LOG_FORMAT",
"value": "multi-line"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner",
Expand Down
27 changes: 26 additions & 1 deletion runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ struct Cli {
default_value = "http://localhost:4317"
)]
otlp_endpoint: String,

/// Specify the format of log events.
#[arg(long, default_value = "multi-line", env = "RUNNER_LOG_FORMAT")]
log_format: LogFormat,
}

#[derive(clap::ValueEnum, Debug, Clone)]
/// The format of the logs
pub enum LogFormat {
/// Compact single line logs
SingleLine,
/// Pretty multi-line logs
MultiLine,
/// JSON logs
Json,
}

impl From<LogFormat> for telemetry::LogFormat {
fn from(format: LogFormat) -> telemetry::LogFormat {
match format {
LogFormat::SingleLine => telemetry::LogFormat::SingleLine,
LogFormat::MultiLine => telemetry::LogFormat::MultiLine,
LogFormat::Json => telemetry::LogFormat::Json,
}
}
}

/// Available Subcommands
Expand Down Expand Up @@ -66,7 +91,7 @@ pub enum CommandResult {
#[tokio::main]
async fn main() -> Result<()> {
let args = Cli::parse();
telemetry::init_tracing(Some(args.otlp_endpoint.clone())).await?;
telemetry::init_tracing(Some(args.otlp_endpoint.clone()), args.log_format.into()).await?;
let metrics_controller = telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?;
info!("starting runner");

Expand Down

0 comments on commit 7bca77d

Please sign in to comment.