Skip to content

Commit

Permalink
Make TxGuard also hold the tx lock (MystenLabs#2410)
Browse files Browse the repository at this point in the history
* Use a tuple struct for succinctness

* Make TxGuard also hold the tx lock, so that concurrent identical txes are excluded for the duration of processing
  • Loading branch information
mystenmark authored Jun 7, 2022
1 parent 6bded91 commit 3cb71dc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 50 deletions.
12 changes: 3 additions & 9 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ pub struct MutexTable<K: Hash> {
}

// Opaque struct to hide tokio::sync::MutexGuard.
pub struct LockGuard<'a> {
_guard: tokio::sync::MutexGuard<'a, ()>,
}
pub struct LockGuard<'a>(tokio::sync::MutexGuard<'a, ()>);

impl<'b, K: Hash + 'b> MutexTable<K> {
pub fn new(size: usize) -> Self {
Expand Down Expand Up @@ -56,17 +54,13 @@ impl<'b, K: Hash + 'b> MutexTable<K> {

let mut guards = Vec::with_capacity(locks.len());
for lock_idx in locks {
guards.push(LockGuard {
_guard: self.lock_table[lock_idx].lock().await,
});
guards.push(LockGuard(self.lock_table[lock_idx].lock().await));
}
guards
}

pub async fn acquire_lock<'a>(&'a self, k: &K) -> LockGuard<'a> {
let lock_idx = self.get_lock_idx(k);
LockGuard {
_guard: self.lock_table[lock_idx].lock().await,
}
LockGuard(self.lock_table[lock_idx].lock().await)
}
}
108 changes: 67 additions & 41 deletions crates/sui-storage/src/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use serde::{de::DeserializeOwned, Serialize};
use std::path::Path;
use std::sync::Mutex;

use crate::{default_db_options, mutex_table::MutexTable};
use crate::{
default_db_options,
mutex_table::{LockGuard, MutexTable},
};
use sui_types::base_types::TransactionDigest;

use sui_types::error::{SuiError, SuiResult};
Expand All @@ -39,10 +42,20 @@ pub trait TxGuard<'a>: Drop {
pub trait WriteAheadLog<'a, C> {
type Guard: TxGuard<'a>;

/// Begin a confirmation transaction identified by its digest, with the associated cert
/// Begin a confirmation transaction identified by its digest, with the associated cert.
///
/// If a transaction with the given digest is already in progress, return None.
/// Otherwise return a TxGuard, which is used to commit the tx.
/// The possible return values mean:
///
/// Ok(None) => There was a concurrent instance of the same tx in progress, but it ended
/// witout being committed. The caller may not proceed processing that tx. A TxGuard for
/// that tx can be (eventually) obtained by calling pop_one_recoverable_tx().
///
/// Ok(Some(TxGuard)) => No other concurrent instance of the same tx is in progress, nor can
/// one start while the guard is held. However, a prior instance of the same tx could have
/// just finished, so the caller may want to check if the tx is already sequenced before
/// proceeding.
///
/// Err(e) => An error occurred.
#[must_use]
async fn begin_tx(&'a self, tx: &TransactionDigest, cert: &C)
-> SuiResult<Option<Self::Guard>>;
Expand All @@ -51,15 +64,14 @@ pub trait WriteAheadLog<'a, C> {
/// while processing them) or implicitly dropped TXes (which can happen because we errored
/// out of the write path and implicitly dropped the TxGuard).
///
/// This method takes and clears the current recoverable txes list.
/// A vector of Guard is returned, which means the txes will return to the recoverable_txes
/// list if not explicitly committed.
/// This method pops one recoverable tx from that log, acquires the lock for that tx,
/// and returns a TxGuard.
///
/// The caller is responsible for running each tx to completion.
/// The caller is responsible for running the tx to completion.
///
/// Recoverable TXes will remain in the on-disk log until they are explicitly committed.
#[must_use]
fn take_recoverable_txes(&'a self) -> Vec<Self::Guard>;
async fn pop_one_recoverable_tx(&'a self) -> Option<Self::Guard>;

/// Get the data associated with a given digest - returns an error if no such transaction is
/// currently open.
Expand All @@ -69,6 +81,7 @@ pub trait WriteAheadLog<'a, C> {

pub struct DBTxGuard<'a, C: Serialize + DeserializeOwned> {
tx: TransactionDigest,
_mutex_guard: LockGuard<'a>,
wal: &'a DBWriteAheadLog<C>,
dead: bool,
}
Expand All @@ -77,9 +90,14 @@ impl<'a, C> DBTxGuard<'a, C>
where
C: Serialize + DeserializeOwned,
{
fn new(tx: &TransactionDigest, wal: &'a DBWriteAheadLog<C>) -> Self {
fn new(
tx: &TransactionDigest,
_mutex_guard: LockGuard<'a>,
wal: &'a DBWriteAheadLog<C>,
) -> Self {
Self {
tx: *tx,
_mutex_guard,
wal,
dead: false,
}
Expand Down Expand Up @@ -118,6 +136,7 @@ pub struct DBWriteAheadLog<C> {
log: DBMap<TransactionDigest, C>,

// Can't use tokio Mutex - must be accessible synchronously from drop trait.
// Only acquire this lock in sync functions to make sure we don't hold it across an await.
recoverable_txes: Mutex<Vec<TransactionDigest>>,

// Guards the get/set in begin_tx
Expand Down Expand Up @@ -170,6 +189,12 @@ where
let mut r = self.recoverable_txes.lock().unwrap();
r.push(*tx);
}

fn pop_one_tx(&self) -> Option<TransactionDigest> {
// Only acquire this lock inside a sync function to make sure we don't accidentally
// hold it across an .await
self.recoverable_txes.lock().unwrap().pop()
}
}

#[async_trait]
Expand All @@ -185,32 +210,35 @@ where
tx: &TransactionDigest,
cert: &C,
) -> SuiResult<Option<DBTxGuard<'a, C>>> {
let _mutex_guard = self.mutex_table.acquire_lock(tx).await;
let mutex_guard = self.mutex_table.acquire_lock(tx).await;

if self.log.contains_key(tx)? {
// We return None instead of a guard, to signal that a tx with this digest is already
// in progress.
// A concurrent tx will have held the mutex guard until it finished. If the tx is
// committed it is removed from the log. This means that if the tx is still in the
// log, it was dropped (errored out) and not committed. Return None to indicate
// that the caller does not hold a guard on this tx and cannot proceed.
//
// TODO: It may turn out to be better to hold the lock until the other guard is
// dropped - this should become clear once this code is being used.
// (The dropped tx must be retried later by calling pop_one_recoverable_tx() and
// obtaining a TxGuard).
return Ok(None);
}

self.log.insert(tx, cert)?;

Ok(Some(DBTxGuard::new(tx, self)))
Ok(Some(DBTxGuard::new(tx, mutex_guard, self)))
}

#[must_use]
fn take_recoverable_txes(&'a self) -> Vec<DBTxGuard<'a, C>> {
// unwrap ok because we should absolutely crash if the mutex is poisoned
let mut v = self.recoverable_txes.lock().unwrap();
let mut new = Vec::new();
std::mem::swap(&mut *v, &mut new);
let ret = new;
ret.iter()
.map(|digest| DBTxGuard::new(digest, self))
.collect()
async fn pop_one_recoverable_tx(&'a self) -> Option<DBTxGuard<'a, C>> {
let candidate = self.pop_one_tx();

match candidate {
None => None,
Some(digest) => {
let guard = self.mutex_table.acquire_lock(&digest).await;
Some(DBTxGuard::new(&digest, guard, self))
}
}
}

fn get_tx_data(&self, g: &DBTxGuard<'a, C>) -> SuiResult<C> {
Expand All @@ -228,6 +256,10 @@ mod tests {
use anyhow;
use sui_types::base_types::TransactionDigest;

async fn recover_queue_empty(log: &DBWriteAheadLog<u32>) -> bool {
log.pop_one_recoverable_tx().await.is_none()
}

#[tokio::test]
async fn test_write_ahead_log() -> Result<(), anyhow::Error> {
let working_dir = tempfile::tempdir()?;
Expand All @@ -238,8 +270,7 @@ mod tests {

{
let log: DBWriteAheadLog<u32> = DBWriteAheadLog::new(&working_dir);
let r = log.take_recoverable_txes();
assert!(r.is_empty());
assert!(recover_queue_empty(&log).await);

let tx1 = log.begin_tx(&tx1_id, &1).await?.unwrap();
tx1.commit_tx().unwrap();
Expand All @@ -252,38 +283,33 @@ mod tests {
// implicit drop
}

let r = log.take_recoverable_txes();
let r = log.pop_one_recoverable_tx().await.unwrap();
// tx3 in recoverable txes because we dropped the guard.
assert_eq!(r.len(), 1);
assert_eq!(r[0].tx_id(), tx3_id);
assert_eq!(r.tx_id(), tx3_id);

// verify previous call emptied the recoverable list
let r_empty = log.take_recoverable_txes();
assert!(r_empty.is_empty());
assert!(recover_queue_empty(&log).await);
}

{
// recover the log
let log: DBWriteAheadLog<u32> = DBWriteAheadLog::new(&working_dir);

// recoverable txes still there
let mut r = log.take_recoverable_txes();
assert_eq!(r.len(), 1);
let g = r.pop().unwrap();
assert_eq!(g.tx_id(), tx3_id);

assert_eq!(log.get_tx_data(&g).unwrap(), 3);
let r = log.pop_one_recoverable_tx().await.unwrap();
assert_eq!(r.tx_id(), tx3_id);
assert_eq!(log.get_tx_data(&r).unwrap(), 3);
assert!(recover_queue_empty(&log).await);

// commit the recoverable tx
g.commit_tx().unwrap();
r.commit_tx().unwrap();
}

{
// recover the log again
let log: DBWriteAheadLog<u32> = DBWriteAheadLog::new(&working_dir);
let r = log.take_recoverable_txes();
// empty, because we committed the tx before.
assert!(r.is_empty());
assert!(recover_queue_empty(&log).await);
}

Ok(())
Expand Down

0 comments on commit 3cb71dc

Please sign in to comment.