Skip to content

Commit

Permalink
Fix cancellation of TransactionBuilder::start
Browse files Browse the repository at this point in the history
  • Loading branch information
sfackler committed Jul 22, 2024
1 parent a1bdd0b commit a0b2d70
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
42 changes: 3 additions & 39 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::{BackendMessages, FrontendMessage};
use crate::codec::BackendMessages;
use crate::config::SslMode;
use crate::connection::{Request, RequestMessages};
use crate::copy_out::CopyOutStream;
Expand All @@ -21,7 +21,7 @@ use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::{backend::Message, frontend};
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -532,43 +532,7 @@ impl Client {
///
/// The transaction will roll back by default - use the `commit` method to commit it.
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
struct RollbackIfNotDone<'me> {
client: &'me Client,
done: bool,
}

impl<'a> Drop for RollbackIfNotDone<'a> {
fn drop(&mut self) {
if self.done {
return;
}

let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}
}

// This is done, as `Future` created by this method can be dropped after
// `RequestMessages` is synchronously send to the `Connection` by
// `batch_execute()`, but before `Responses` is asynchronously polled to
// completion. In that case `Transaction` won't be created and thus
// won't be rolled back.
{
let mut cleaner = RollbackIfNotDone {
client: self,
done: false,
};
self.batch_execute("BEGIN").await?;
cleaner.done = true;
}

Ok(Transaction::new(self))
self.build_transaction().start().await
}

/// Returns a builder for a transaction with custom settings.
Expand Down
40 changes: 38 additions & 2 deletions tokio-postgres/src/transaction_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{Client, Error, Transaction};
use postgres_protocol::message::frontend;

use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};

/// The isolation level of a database transaction.
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -106,7 +108,41 @@ impl<'a> TransactionBuilder<'a> {
query.push_str(s);
}

self.client.batch_execute(&query).await?;
struct RollbackIfNotDone<'me> {
client: &'me Client,
done: bool,
}

impl<'a> Drop for RollbackIfNotDone<'a> {
fn drop(&mut self) {
if self.done {
return;
}

let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}
}

// This is done as `Future` created by this method can be dropped after
// `RequestMessages` is synchronously send to the `Connection` by
// `batch_execute()`, but before `Responses` is asynchronously polled to
// completion. In that case `Transaction` won't be created and thus
// won't be rolled back.
{
let mut cleaner = RollbackIfNotDone {
client: self.client,
done: false,
};
self.client.batch_execute(&query).await?;
cleaner.done = true;
}

Ok(Transaction::new(self.client))
}
Expand Down

0 comments on commit a0b2d70

Please sign in to comment.