Skip to content

Commit

Permalink
batch: Replace BatchResult with QueryResult
Browse files Browse the repository at this point in the history
BatchResult is a struct used to represent
the result of a batch query. The difference between
BatchResult and QueryResult is that BatchResult
doesn't contain any rows. Batches only
support INSERT and UPDATE statements,
so it made sense to create a type with
stricter constraints that could be
used to represent this rowless result.

It turns out that in certain cases batches
do return rows.
For batches with LWT queries, like this one:
```
BEGIN BATCH
UPDATE ks.data set version = 1 WHERE key = 'x1a' and client = 0x01 IF version = 0;
UPDATE ks.data set version = 1 WHERE key = 'x1b' and client = 0x01 IF version = 0;
APPLY BATCH;
```

The database returns rows with information about the changes performed:
```
 [applied] | client | key | version
-----------+--------+-----+---------
     False |   0x01 | x1a |       1
     False |   0x01 | x1b |       0
```

It wasn't possible to access these rows because BatchResult
had no rows, as described in scylladb#554.

The solution is to remove BatchResult
and use QueryResult instead. Batch is now
like any other query which might return rows.

There's one field of QueryResult that
doesn't make sense for batches: `paging_state`,
but we can ignore it.
QueryResult has a ton of convenience methods
that can be used to access the received rows,
we would have to duplicate them all for BatchResult.
I think we can have one useless field, this much
duplication isn't worth it.

At first I thought that maybe we could just expose
a list of [applied] bools, not rows, but this
isn't enough.
There are other columns that tell us the values
of columns involved in the LWT. This might be
useful to some users, we can't just discard it.

This change brings us a step closer to implementing scylladb#100

Signed-off-by: Jan Ciolek <[email protected]>
  • Loading branch information
cvybhu committed Sep 21, 2022
1 parent 7adeaee commit 1bfc5c1
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 75 deletions.
3 changes: 1 addition & 2 deletions docs/source/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@
- [Logging](logging/logging.md)

- [Query tracing](tracing/tracing.md)
- [Tracing a simple/prepared query](tracing/simple-prepared.md)
- [Tracing a batch query](tracing/batch.md)
- [Tracing a simple/prepared query](tracing/basic.md)
- [Tracing a paged query](tracing/paged.md)
- [Tracing `Session::prepare`](tracing/prepare.md)
- [Query Execution History](tracing/query-history.md)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/queries/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A batch statement allows to run many queries at once.\
These queries can be [simple queries](simple.md) or [prepared queries](prepared.md).\
Only queries like `INSERT` or `UPDATE` can be in a batch, batch doesn't return any rows.
Only queries like `INSERT` or `UPDATE` can be in a batch.

```rust
# extern crate scylla;
Expand Down Expand Up @@ -34,7 +34,7 @@ let batch_values = ((1_i32, 2_i32),
(),
(5_i32,));

// Run the batch, doesn't return any rows
// Run the batch
session.batch(&batch, batch_values).await?;
# Ok(())
# }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Tracing a simple/prepared query
# Tracing a simple/prepared/batch query

Both [simple query](../queries/simple.md) and [prepared query](../queries/prepared.md)
[Simple query](../queries/simple.md), [prepared query](../queries/prepared.md) and [batch query](../queries/batch.md)
return a `QueryResult` which contains a `tracing_id` if tracing was enabled.

### Tracing a simple query
Expand Down Expand Up @@ -62,3 +62,34 @@ if let Some(id) = tracing_id {
# Ok(())
# }
```

### Tracing a batch query
```rust
# extern crate scylla;
# extern crate uuid;
# use scylla::Session;
# use std::error::Error;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::batch::Batch;
use scylla::QueryResult;
use scylla::tracing::TracingInfo;
use uuid::Uuid;

// Create a batch statement
let mut batch: Batch = Default::default();
batch.append_statement("INSERT INTO ks.tab (a) VALUES(4)");

// Enable tracing
batch.set_tracing(true);

let res: QueryResult = session.batch(&batch, ((),)).await?;
let tracing_id: Option<Uuid> = res.tracing_id;

if let Some(id) = tracing_id {
// Query tracing info from system_traces.sessions and system_traces.events
let tracing_info: TracingInfo = session.get_tracing_info(&id).await?;
println!("tracing_info: {:#?}", tracing_info);
}
# Ok(())
# }
```
32 changes: 0 additions & 32 deletions docs/source/tracing/batch.md

This file was deleted.

9 changes: 4 additions & 5 deletions docs/source/tracing/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ Tracing is a feature provided by Scylla. When sending a query we can set a flag
After completing the query Scylla provides a `tracing_id` which can be used to fetch information about it - which nodes it was sent to, what operations were performed etc.

Queries that support tracing:
* [`Session::query()`](simple-prepared.md)
* [`Session::query()`](basic.md)
* [`Session::query_iter()`](paged.md)
* [`Session::execute()`](simple-prepared.md)
* [`Session::execute()`](basic.md)
* [`Session::execute_iter()`](paged.md)
* [`Session::batch()`](batch.md)
* [`Session::batch()`](basic.md)
* [`Session::prepare()`](prepare.md)

After obtaining the tracing id you can use `Session::get_tracing_info()` to query tracing information.\
Expand All @@ -34,8 +34,7 @@ More information is available in the [Query Execution History](query-history.md)
:hidden:
:glob:
simple-prepared
batch
basic
paged
prepare
query-history
Expand Down
4 changes: 2 additions & 2 deletions examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use scylla::statement::{
};
use scylla::tracing::{GetTracingConfig, TracingInfo};
use scylla::transport::iterator::RowIterator;
use scylla::{BatchResult, QueryResult};
use scylla::QueryResult;
use scylla::{Session, SessionBuilder};
use std::env;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn main() -> Result<()> {
batch.set_tracing(true);

// Run the batch and print its tracing_id
let batch_result: BatchResult = session.batch(&batch, ((),)).await?;
let batch_result: QueryResult = session.batch(&batch, ((),)).await?;
println!("Batch tracing id: {:?}\n", batch_result.tracing_id);

// CUSTOM
Expand Down
1 change: 0 additions & 1 deletion scylla/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub use frame::response::cql_to_rust;
pub use frame::response::cql_to_rust::FromRow;

pub use transport::caching_session::CachingSession;
pub use transport::connection::BatchResult;
pub use transport::query_result::QueryResult;
pub use transport::session::{IntoTypedRows, Session, SessionConfig};
pub use transport::session_builder::SessionBuilder;
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/statement/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Batch {
}

/// Enable or disable CQL Tracing for this batch
/// If enabled session.batch() will return a BatchResult containing tracing_id
/// If enabled session.batch() will return a QueryResult containing tracing_id
/// which can be used to query tracing information about the execution of this query
pub fn set_tracing(&mut self, should_trace: bool) {
self.config.tracing = should_trace;
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/transport/caching_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::transport::errors::QueryError;
use crate::transport::iterator::RowIterator;
use crate::{BatchResult, QueryResult, Session};
use crate::{QueryResult, Session};
use bytes::Bytes;
use dashmap::DashMap;
use futures::future::try_join_all;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl CachingSession {
&self,
batch: &Batch,
values: impl BatchValues,
) -> Result<BatchResult, QueryError> {
) -> Result<QueryResult, QueryError> {
let all_prepared: bool = batch
.statements
.iter()
Expand Down
17 changes: 3 additions & 14 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,6 @@ pub struct NonErrorQueryResponse {
pub warnings: Vec<String>,
}

/// Result of Session::batch(). Contains no rows, only some useful information.
pub struct BatchResult {
/// Warnings returned by the database
pub warnings: Vec<String>,
/// CQL Tracing uuid - can only be Some if tracing is enabled for this batch
pub tracing_id: Option<Uuid>,
}

impl QueryResponse {
pub fn into_non_error_query_response(self) -> Result<NonErrorQueryResponse, QueryError> {
Ok(NonErrorQueryResponse {
Expand Down Expand Up @@ -632,7 +624,7 @@ impl Connection {
&self,
batch: &Batch,
values: impl BatchValues,
) -> Result<BatchResult, QueryError> {
) -> Result<QueryResult, QueryError> {
self.batch_with_consistency(
batch,
values,
Expand All @@ -648,7 +640,7 @@ impl Connection {
batch: &Batch,
values: impl BatchValues,
consistency: Consistency,
) -> Result<BatchResult, QueryError> {
) -> Result<QueryResult, QueryError> {
let statements_count = batch.statements.len();
if statements_count != values.len() {
return Err(QueryError::BadQuery(BadQuery::ValueLenMismatch(
Expand Down Expand Up @@ -700,10 +692,7 @@ impl Connection {
}
_ => Err(err.into()),
},
Response::Result(_) => Ok(BatchResult {
warnings: query_response.warnings,
tracing_id: query_response.tracing_id,
}),
Response::Result(_) => Ok(query_response.into_query_result()?),
_ => Err(QueryError::ProtocolError(
"BATCH: Unexpected server response",
)),
Expand Down
13 changes: 4 additions & 9 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use crate::routing::Token;
use crate::statement::{Consistency, SerialConsistency};
use crate::tracing::{GetTracingConfig, TracingEvent, TracingInfo};
use crate::transport::cluster::{Cluster, ClusterData, ClusterNeatDebug};
use crate::transport::connection::{
BatchResult, Connection, ConnectionConfig, VerifiedKeyspaceName,
};
use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName};
use crate::transport::connection_pool::PoolConfig;
use crate::transport::iterator::{PreparedIteratorConfig, RowIterator};
use crate::transport::load_balancing::{
Expand Down Expand Up @@ -1007,7 +1005,7 @@ impl Session {
&self,
batch: &Batch,
values: impl BatchValues,
) -> Result<BatchResult, QueryError> {
) -> Result<QueryResult, QueryError> {
let values_ref = &values;

let run_query_result = self
Expand All @@ -1025,10 +1023,7 @@ impl Session {
.await?;

Ok(match run_query_result {
RunQueryResult::IgnoredWriteError => BatchResult {
tracing_id: None,
warnings: Vec::new(),
},
RunQueryResult::IgnoredWriteError => QueryResult::default(),
RunQueryResult::Completed(response) => response,
})
}
Expand Down Expand Up @@ -1682,7 +1677,7 @@ async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, NewSessionError>
pub trait AllowedRunQueryResTType {}

impl AllowedRunQueryResTType for Uuid {}
impl AllowedRunQueryResTType for BatchResult {}
impl AllowedRunQueryResTType for QueryResult {}
impl AllowedRunQueryResTType for NonErrorQueryResponse {}

struct ExecuteQueryContext<'a> {
Expand Down
5 changes: 2 additions & 3 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::query::Query;
use crate::routing::Token;
use crate::statement::Consistency;
use crate::tracing::TracingInfo;
use crate::transport::connection::BatchResult;
use crate::transport::errors::{BadKeyspaceName, BadQuery, DbError, QueryError};
use crate::transport::partitioner::{Murmur3Partitioner, Partitioner, PartitionerName};
use crate::transport::topology::Strategy::SimpleStrategy;
Expand Down Expand Up @@ -1011,15 +1010,15 @@ async fn test_tracing_batch(session: &Session, ks: String) {
let mut untraced_batch: Batch = Default::default();
untraced_batch.append_statement(&format!("INSERT INTO {}.tab (a) VALUES('a')", ks)[..]);

let untraced_batch_result: BatchResult = session.batch(&untraced_batch, ((),)).await.unwrap();
let untraced_batch_result: QueryResult = session.batch(&untraced_batch, ((),)).await.unwrap();
assert!(untraced_batch_result.tracing_id.is_none());

// Batch with tracing enabled has a tracing uuid in result
let mut traced_batch: Batch = Default::default();
traced_batch.append_statement(&format!("INSERT INTO {}.tab (a) VALUES('a')", ks)[..]);
traced_batch.config.tracing = true;

let traced_batch_result: BatchResult = session.batch(&traced_batch, ((),)).await.unwrap();
let traced_batch_result: QueryResult = session.batch(&traced_batch, ((),)).await.unwrap();
assert!(traced_batch_result.tracing_id.is_some());

assert_in_tracing_table(session, traced_batch_result.tracing_id.unwrap()).await;
Expand Down

0 comments on commit 1bfc5c1

Please sign in to comment.