Skip to content

Commit

Permalink
Add tracing path from engine to client (prisma#3022)
Browse files Browse the repository at this point in the history
This adds useful tracing spans to the engine to observe the flow
of prisma queries. A tracer is implemented for the node-api that passes
the traces to the log callback function as json so that the prisma client
can process them and add them to the OTEL pipeline.

On the queryengine binary side, the traces are written to stdout via
a new tracer. This again allows the client to read the traces and
add the tracing spans to the OTEL pipeline.


Co-authored-by: daniel starns <[email protected]>
  • Loading branch information
garrensmith and danstarns authored Jul 12, 2022
1 parent e644e90 commit a3a17ac
Show file tree
Hide file tree
Showing 29 changed files with 524 additions and 143 deletions.
65 changes: 37 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ validate:
cargo run --bin test-cli -- validate-datamodel dev_datamodel.prisma

qe:
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics
cargo run --bin query-engine -- --enable-playground --enable-raw-queries --enable-metrics --enable-open-telemetry

qe-dmmf:
cargo run --bin query-engine -- cli dmmf > dmmf.json
Expand Down Expand Up @@ -226,7 +226,6 @@ use-local-query-engine:
show-metrics:
docker-compose -f docker-compose.yml up -d --remove-orphans grafana prometheus


## OpenTelemetry
otel:
docker-compose up --remove-orphans -d otel
16 changes: 13 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,21 @@ services:
- databases

otel:
image: jaegertracing/opentelemetry-all-in-one:latest
image: jaegertracing/all-in-one:1.35
environment:
COLLECTOR_OTLP_ENABLED: "true"
COLLECTOR_ZIPKIN_HOST_PORT: ":9411"
ports:
- 13133:13133
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 4317:55680
- 4317:4317
- 4318:4318
- 14250:14250
- 14268:14268
- 14269:14269
- 9411:9411

prometheus:
image: prom/prometheus
Expand Down
4 changes: 2 additions & 2 deletions query-engine/connectors/sql-query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ tokio = "1.0"
tracing = "0.1"
tracing-futures = "0.2"
uuid = "0.8"
opentelemetry = { version = "0.16", features = ["tokio"] }
tracing-opentelemetry = "0.16"
opentelemetry = { version = "0.17", features = ["tokio"] }
tracing-opentelemetry = "0.17.3"

[dependencies.quaint]
features = [
Expand Down
4 changes: 2 additions & 2 deletions query-engine/connectors/sql-query-connector/src/query_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{collections::HashMap, panic::AssertUnwindSafe};
use crate::sql_trace::trace_parent_to_string;

use opentelemetry::trace::TraceContextExt;
use tracing::{span, Span};
use tracing::{info_span, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

impl<'t> QueryExt for connector::Transaction<'t> {}
Expand All @@ -39,7 +39,7 @@ pub trait QueryExt: Queryable + Send + Sync {
idents: &[ColumnMetadata<'_>],
trace_id: Option<String>,
) -> crate::Result<Vec<SqlRow>> {
let span = span!(tracing::Level::INFO, "filter read query");
let span = info_span!("filter read query");

let otel_ctx = span.context();
let span_ref = otel_ctx.span();
Expand Down
6 changes: 3 additions & 3 deletions query-engine/connectors/sql-query-connector/src/sql_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

pub fn trace_parent_to_string(context: &SpanContext) -> String {
let trace_id = context.trace_id().to_hex();
let span_id = context.span_id().to_hex();
let trace_id = context.trace_id();
let span_id = context.span_id();

// see https://www.w3.org/TR/trace-context/#traceparent-header-field-values
format!("traceparent=00-{}-{}-01", trace_id, span_id)
format!("traceparent=00-{:032x}-{:032x}-01", trace_id, span_id)
}

pub trait SqlTraceComment: Sized {
Expand Down
2 changes: 2 additions & 0 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ petgraph = "0.4"
prisma-inflector = { path = "../../libs/prisma-inflector" }
prisma-models = { path = "../prisma-models" }
prisma-value = { path = "../../libs/prisma-value" }
opentelemetry = { version = "0.17"}
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sql-connector = { path = "../connectors/sql-query-connector", package = "sql-query-connector", optional = true }
Expand All @@ -38,6 +39,7 @@ tokio = { version = "1.8" }
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = "0.3.11"
tracing-opentelemetry = "0.17.4"
url = "2"
user-facing-errors = { path = "../../libs/user-facing-errors" }
uuid = "0.8"
Expand Down
18 changes: 16 additions & 2 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use connector::{Connection, ConnectionLike, Connector};
use futures::future;
use metrics::{histogram, increment_counter};
use schema::QuerySchemaRef;
use tracing::Instrument;
use tracing_futures::WithSubscriber;

pub async fn execute_single_operation(
Expand Down Expand Up @@ -64,7 +65,13 @@ pub async fn execute_single_self_contained<C: Connector + Send + Sync>(
force_transactions: bool,
) -> crate::Result<ResponseData> {
let (query_graph, serializer) = QueryGraphBuilder::new(query_schema).build(operation)?;
let conn = connector.get_connection().await?;
let connection_name = connector.name();
let conn_span = info_span!(
"prisma:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = connector.get_connection().instrument(conn_span).await?;
execute_self_contained(conn, query_graph, serializer, force_transactions, trace_id).await
}

Expand All @@ -82,7 +89,14 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(
match QueryGraphBuilder::new(query_schema.clone()).build(op.clone()) {
Ok((graph, serializer)) => {
increment_counter!("query_total_operations");
let conn = connector.get_connection().await?;

let connection_name = connector.name();
let conn_span = info_span!(
"prisma:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = connector.get_connection().instrument(conn_span).await?;

futures.push(tokio::spawn(
execute_self_contained(conn, graph, serializer, force_transactions, trace_id.clone())
Expand Down
16 changes: 15 additions & 1 deletion query-engine/core/src/executor/interpreting_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use connector::Connector;
use schema::QuerySchemaRef;
use tokio::time::{self, Duration};
use tracing_futures::Instrument;

/// Central query executor and main entry point into the query core.
pub struct InterpretingExecutor<C> {
Expand Down Expand Up @@ -84,7 +85,13 @@ where
if let Some(tx_id) = tx_id {
self.itx_manager.batch_execute(&tx_id, operations, trace_id).await
} else if transactional {
let mut conn = self.connector.get_connection().await?;
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction().await?;

let results = execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;
Expand Down Expand Up @@ -126,10 +133,17 @@ where
) -> crate::Result<TxId> {
let id = TxId::default();
trace!("[{}] Starting...", id);
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
Expand Down
Loading

0 comments on commit a3a17ac

Please sign in to comment.