Skip to content

Commit

Permalink
Merge pull request scylladb#668 from piodul/extended-request-spans
Browse files Browse the repository at this point in the history
Extended request spans
  • Loading branch information
cvybhu authored Mar 22, 2023
2 parents 8d09a8c + afb60ff commit dc1c83e
Show file tree
Hide file tree
Showing 14 changed files with 461 additions and 137 deletions.
2 changes: 1 addition & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> Result<()> {
.await?;

let serialized_pk = (pk,).serialized()?.into_owned();
let t = Murmur3Partitioner::hash(prepared.compute_partition_key(&serialized_pk)?).value;
let t = Murmur3Partitioner::hash(&prepared.compute_partition_key(&serialized_pk)?).value;

let statement_info = load_balancing::RoutingInfo {
token: Some(scylla::routing::Token { value: t }),
Expand Down
5 changes: 5 additions & 0 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ pub struct Rows {
pub metadata: ResultMetadata,
pub rows_count: usize,
pub rows: Vec<Row>,
/// Original size of the serialized rows.
pub serialized_size: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -836,6 +838,8 @@ pub fn deser_cql_value(typ: &ColumnType, buf: &mut &[u8]) -> StdResult<CqlValue,
fn deser_rows(buf: &mut &[u8]) -> StdResult<Rows, ParseError> {
let metadata = deser_result_metadata(buf)?;

let original_size = buf.len();

// TODO: the protocol allows an optimization (which must be explicitly requested on query by
// the driver) where the column metadata is not sent with the result.
// Implement this optimization. We'll then need to take the column types by a parameter.
Expand All @@ -861,6 +865,7 @@ fn deser_rows(buf: &mut &[u8]) -> StdResult<Rows, ParseError> {
metadata,
rows_count,
rows,
serialized_size: original_size - buf.len(),
})
}

Expand Down
4 changes: 4 additions & 0 deletions scylla-cql/src/frame/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ impl SerializedValues {
pub fn len(&self) -> i16 {
self.values_num
}

pub fn size(&self) -> usize {
self.serialized_values.len()
}
}

#[derive(Clone, Copy)]
Expand Down
19 changes: 15 additions & 4 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -186,7 +187,7 @@ impl Cluster {
};

let (fut, worker_handle) = worker.work().remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

let result = Cluster {
data: cluster_data,
Expand Down Expand Up @@ -432,11 +433,21 @@ impl ClusterData {
Some(buf.into())
}
};
Ok(partitioner.hash(serialized_pk.unwrap_or_default()))
Ok(partitioner.hash(&serialized_pk.unwrap_or_default()))
}

/// Access to replicas owning a given token
pub fn get_token_endpoints(&self, keyspace: &str, token: Token) -> Vec<Arc<Node>> {
self.get_token_endpoints_iter(keyspace, token)
.cloned()
.collect()
}

pub(crate) fn get_token_endpoints_iter(
&self,
keyspace: &str,
token: Token,
) -> impl Iterator<Item = &Arc<Node>> {
let keyspace = self.keyspaces.get(keyspace);
let strategy = keyspace
.map(|k| &k.strategy)
Expand All @@ -445,7 +456,7 @@ impl ClusterData {
.replica_locator()
.replicas_for_token(token, strategy, None);

replica_set.into_iter().cloned().collect()
replica_set.into_iter()
}

/// Access to replicas owning a given partition key (similar to `nodetool getendpoints`)
Expand Down Expand Up @@ -523,7 +534,7 @@ impl ClusterWorker {

let cluster_data = self.cluster_data.load_full();
let use_keyspace_future = Self::handle_use_keyspace_request(cluster_data, request);
tokio::spawn(use_keyspace_future);
tokio::spawn(use_keyspace_future.with_current_subscriber());
},
None => return, // If use_keyspace_channel was closed then cluster was dropped, we can stop working
}
Expand Down
11 changes: 7 additions & 4 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWrite
use tokio::net::{TcpSocket, TcpStream};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::instrument::WithSubscriber;
use tracing::{debug, error, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -192,13 +193,14 @@ impl NonErrorQueryResponse {
}

pub fn into_query_result(self) -> Result<QueryResult, QueryError> {
let (rows, paging_state, col_specs) = match self.response {
let (rows, paging_state, col_specs, serialized_size) = match self.response {
NonErrorResponse::Result(result::Result::Rows(rs)) => (
Some(rs.rows),
rs.metadata.paging_state,
rs.metadata.col_specs,
rs.serialized_size,
),
NonErrorResponse::Result(_) => (None, None, vec![]),
NonErrorResponse::Result(_) => (None, None, vec![], 0),
_ => {
return Err(QueryError::ProtocolError(
"Unexpected server response, expected Result or Error",
Expand All @@ -212,6 +214,7 @@ impl NonErrorQueryResponse {
tracing_id: self.tracing_id,
paging_state,
col_specs,
serialized_size,
})
}
}
Expand Down Expand Up @@ -858,7 +861,7 @@ impl Connection {
orphan_notification_receiver,
)
.remote_handle();
tokio::task::spawn(task);
tokio::task::spawn(task.with_current_subscriber());
return Ok(handle);
}

Expand All @@ -870,7 +873,7 @@ impl Connection {
orphan_notification_receiver,
)
.remote_handle();
tokio::task::spawn(task);
tokio::task::spawn(task.with_current_subscriber());
Ok(handle)
}

Expand Down
22 changes: 13 additions & 9 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, trace, warn};

/// The target size of a per-node connection pool.
Expand Down Expand Up @@ -212,7 +213,7 @@ impl NodeConnectionPool {

let conns = refiller.get_shared_connections();
let (fut, refiller_handle) = refiller.run(use_keyspace_request_receiver).remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

let keepaliver_handle = if let Some(interval) = keepalive_interval {
let keepaliver = Keepaliver {
Expand All @@ -222,7 +223,7 @@ impl NodeConnectionPool {
};

let (fut, keepaliver_handle) = keepaliver.work().remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

Some(keepaliver_handle)
} else {
Expand Down Expand Up @@ -1221,14 +1222,17 @@ impl PoolRefiller {
Err(QueryError::IoError(io_error.unwrap()))
};

tokio::task::spawn(async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
tokio::task::spawn(
async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
}
let _ = response_sender.send(res);
}
let _ = response_sender.send(res);
});
.with_current_subscriber(),
);
}

// Requires the keyspace to be set
Expand Down
Loading

0 comments on commit dc1c83e

Please sign in to comment.