Skip to content

Commit

Permalink
fix(qe) Add in dispatcher for self_contained queries (prisma#2826)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrensmith authored Apr 4, 2022
1 parent 4c0f1c1 commit 22b8221
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
13 changes: 6 additions & 7 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::pipeline::QueryPipeline;
use crate::{IrSerializer, Operation, QueryGraph, QueryGraphBuilder, QueryInterpreter, QuerySchemaRef, ResponseData};
use connector::{Connection, ConnectionLike, Connector};
use futures::future;
use tracing_futures::WithSubscriber;

pub async fn execute_single_operation(
query_schema: QuerySchemaRef,
Expand Down Expand Up @@ -63,18 +64,16 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
let mut futures = Vec::with_capacity(operations.len());

let dispatcher = crate::get_current_dispatcher();
for op in operations {
match QueryGraphBuilder::new(query_schema.clone()).build(op.clone()) {
Ok((graph, serializer)) => {
let conn = connector.get_connection().await?;

futures.push(tokio::spawn(execute_self_contained(
conn,
graph,
serializer,
force_transactions,
trace_id.clone(),
)));
futures.push(tokio::spawn(
execute_self_contained(conn, graph, serializer, force_transactions, trace_id.clone())
.with_subscriber(dispatcher.clone()),
));
}

// This looks unnecessary, but is the simplest way to preserve ordering of results for the batch.
Expand Down
19 changes: 19 additions & 0 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::{query_document::Operation, response_ir::ResponseData, schema::QueryS
use async_trait::async_trait;
use connector::Connector;

use tracing::Dispatch;

#[async_trait]
pub trait QueryExecutor: TransactionManager {
/// Executes a single operation and returns its result.
Expand Down Expand Up @@ -68,3 +70,20 @@ pub trait TransactionManager {
/// Rolls back a transaction.
async fn rollback_tx(&self, tx_id: TxId) -> crate::Result<()>;
}

// With the node-api when a future is spawned in a new thread `tokio:spawn` it will not
// use the current dispatcher and its logs will not be captured anymore. We can use this
// method to get the current dispatcher and combine it with `with_subscriber`
// let dispatcher = get_current_dispatcher();
// tokio::spawn(async {
// my_async_ops.await
// }.with_subscriber(dispatcher));
//
//
// Finally, this can be replaced with with_current_collector
// https://github.com/tokio-rs/tracing/blob/master/tracing-futures/src/lib.rs#L234
// once this is in a release

pub fn get_current_dispatcher() -> Dispatch {
tracing::dispatcher::get_default(|current| current.clone())
}
16 changes: 1 addition & 15 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tokio::{
},
time::{self, Duration},
};
use tracing::Dispatch;
use tracing_futures::WithSubscriber;

use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
Expand Down Expand Up @@ -254,19 +253,6 @@ impl ITXClient {
}
}

// Warning!!! This is not pretty. When the actors are spawned with the node-api
// They lose the current subscriber because it is not a global subscriber
// and are no longer able to send logs. This fix gets the current dispatcher
// and clones it out so that we can use `WithSubscriber` with it.
fn get_current_dispatcher() -> Dispatch {
let mut dispatcher = tracing::Dispatch::default();
tracing::dispatcher::get_default(|current| {
dispatcher = current.clone();
});

dispatcher
}

pub fn spawn_itx_actor(
query_schema: QuerySchemaRef,
tx_id: TxId,
Expand All @@ -284,7 +270,7 @@ pub fn spawn_itx_actor(
};

let mut server = ITXServer::new(tx_id, CachedTx::Open(value), timeout, rx_from_client, query_schema);
let dispatcher = get_current_dispatcher();
let dispatcher = crate::get_current_dispatcher();

tokio::task::spawn(
async move {
Expand Down

0 comments on commit 22b8221

Please sign in to comment.