From 3cb71dc619e6c7b1885dca73fb5967c2ce942265 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Tue, 7 Jun 2022 09:08:15 -0700 Subject: [PATCH] Make TxGuard also hold the tx lock (#2410) * Use a tuple struct for succinctness * Make TxGuard also hold the tx lock, so that concurrent identical txes are excluded for the duration of processing --- crates/sui-storage/src/mutex_table.rs | 12 +-- crates/sui-storage/src/write_ahead_log.rs | 108 ++++++++++++++-------- 2 files changed, 70 insertions(+), 50 deletions(-) diff --git a/crates/sui-storage/src/mutex_table.rs b/crates/sui-storage/src/mutex_table.rs index 6baa6bf3debaf..7a4162157ecba 100644 --- a/crates/sui-storage/src/mutex_table.rs +++ b/crates/sui-storage/src/mutex_table.rs @@ -12,9 +12,7 @@ pub struct MutexTable { } // Opaque struct to hide tokio::sync::MutexGuard. -pub struct LockGuard<'a> { - _guard: tokio::sync::MutexGuard<'a, ()>, -} +pub struct LockGuard<'a>(tokio::sync::MutexGuard<'a, ()>); impl<'b, K: Hash + 'b> MutexTable { pub fn new(size: usize) -> Self { @@ -56,17 +54,13 @@ impl<'b, K: Hash + 'b> MutexTable { let mut guards = Vec::with_capacity(locks.len()); for lock_idx in locks { - guards.push(LockGuard { - _guard: self.lock_table[lock_idx].lock().await, - }); + guards.push(LockGuard(self.lock_table[lock_idx].lock().await)); } guards } pub async fn acquire_lock<'a>(&'a self, k: &K) -> LockGuard<'a> { let lock_idx = self.get_lock_idx(k); - LockGuard { - _guard: self.lock_table[lock_idx].lock().await, - } + LockGuard(self.lock_table[lock_idx].lock().await) } } diff --git a/crates/sui-storage/src/write_ahead_log.rs b/crates/sui-storage/src/write_ahead_log.rs index bfc7cb76caa4f..6a26ccf8ea6af 100644 --- a/crates/sui-storage/src/write_ahead_log.rs +++ b/crates/sui-storage/src/write_ahead_log.rs @@ -12,7 +12,10 @@ use serde::{de::DeserializeOwned, Serialize}; use std::path::Path; use std::sync::Mutex; -use crate::{default_db_options, mutex_table::MutexTable}; +use crate::{ + default_db_options, + mutex_table::{LockGuard, MutexTable}, +}; use sui_types::base_types::TransactionDigest; use sui_types::error::{SuiError, SuiResult}; @@ -39,10 +42,20 @@ pub trait TxGuard<'a>: Drop { pub trait WriteAheadLog<'a, C> { type Guard: TxGuard<'a>; - /// Begin a confirmation transaction identified by its digest, with the associated cert + /// Begin a confirmation transaction identified by its digest, with the associated cert. /// - /// If a transaction with the given digest is already in progress, return None. - /// Otherwise return a TxGuard, which is used to commit the tx. + /// The possible return values mean: + /// + /// Ok(None) => There was a concurrent instance of the same tx in progress, but it ended + /// witout being committed. The caller may not proceed processing that tx. A TxGuard for + /// that tx can be (eventually) obtained by calling pop_one_recoverable_tx(). + /// + /// Ok(Some(TxGuard)) => No other concurrent instance of the same tx is in progress, nor can + /// one start while the guard is held. However, a prior instance of the same tx could have + /// just finished, so the caller may want to check if the tx is already sequenced before + /// proceeding. + /// + /// Err(e) => An error occurred. #[must_use] async fn begin_tx(&'a self, tx: &TransactionDigest, cert: &C) -> SuiResult>; @@ -51,15 +64,14 @@ pub trait WriteAheadLog<'a, C> { /// while processing them) or implicitly dropped TXes (which can happen because we errored /// out of the write path and implicitly dropped the TxGuard). /// - /// This method takes and clears the current recoverable txes list. - /// A vector of Guard is returned, which means the txes will return to the recoverable_txes - /// list if not explicitly committed. + /// This method pops one recoverable tx from that log, acquires the lock for that tx, + /// and returns a TxGuard. /// - /// The caller is responsible for running each tx to completion. + /// The caller is responsible for running the tx to completion. /// /// Recoverable TXes will remain in the on-disk log until they are explicitly committed. #[must_use] - fn take_recoverable_txes(&'a self) -> Vec; + async fn pop_one_recoverable_tx(&'a self) -> Option; /// Get the data associated with a given digest - returns an error if no such transaction is /// currently open. @@ -69,6 +81,7 @@ pub trait WriteAheadLog<'a, C> { pub struct DBTxGuard<'a, C: Serialize + DeserializeOwned> { tx: TransactionDigest, + _mutex_guard: LockGuard<'a>, wal: &'a DBWriteAheadLog, dead: bool, } @@ -77,9 +90,14 @@ impl<'a, C> DBTxGuard<'a, C> where C: Serialize + DeserializeOwned, { - fn new(tx: &TransactionDigest, wal: &'a DBWriteAheadLog) -> Self { + fn new( + tx: &TransactionDigest, + _mutex_guard: LockGuard<'a>, + wal: &'a DBWriteAheadLog, + ) -> Self { Self { tx: *tx, + _mutex_guard, wal, dead: false, } @@ -118,6 +136,7 @@ pub struct DBWriteAheadLog { log: DBMap, // Can't use tokio Mutex - must be accessible synchronously from drop trait. + // Only acquire this lock in sync functions to make sure we don't hold it across an await. recoverable_txes: Mutex>, // Guards the get/set in begin_tx @@ -170,6 +189,12 @@ where let mut r = self.recoverable_txes.lock().unwrap(); r.push(*tx); } + + fn pop_one_tx(&self) -> Option { + // Only acquire this lock inside a sync function to make sure we don't accidentally + // hold it across an .await + self.recoverable_txes.lock().unwrap().pop() + } } #[async_trait] @@ -185,32 +210,35 @@ where tx: &TransactionDigest, cert: &C, ) -> SuiResult>> { - let _mutex_guard = self.mutex_table.acquire_lock(tx).await; + let mutex_guard = self.mutex_table.acquire_lock(tx).await; if self.log.contains_key(tx)? { - // We return None instead of a guard, to signal that a tx with this digest is already - // in progress. + // A concurrent tx will have held the mutex guard until it finished. If the tx is + // committed it is removed from the log. This means that if the tx is still in the + // log, it was dropped (errored out) and not committed. Return None to indicate + // that the caller does not hold a guard on this tx and cannot proceed. // - // TODO: It may turn out to be better to hold the lock until the other guard is - // dropped - this should become clear once this code is being used. + // (The dropped tx must be retried later by calling pop_one_recoverable_tx() and + // obtaining a TxGuard). return Ok(None); } self.log.insert(tx, cert)?; - Ok(Some(DBTxGuard::new(tx, self))) + Ok(Some(DBTxGuard::new(tx, mutex_guard, self))) } #[must_use] - fn take_recoverable_txes(&'a self) -> Vec> { - // unwrap ok because we should absolutely crash if the mutex is poisoned - let mut v = self.recoverable_txes.lock().unwrap(); - let mut new = Vec::new(); - std::mem::swap(&mut *v, &mut new); - let ret = new; - ret.iter() - .map(|digest| DBTxGuard::new(digest, self)) - .collect() + async fn pop_one_recoverable_tx(&'a self) -> Option> { + let candidate = self.pop_one_tx(); + + match candidate { + None => None, + Some(digest) => { + let guard = self.mutex_table.acquire_lock(&digest).await; + Some(DBTxGuard::new(&digest, guard, self)) + } + } } fn get_tx_data(&self, g: &DBTxGuard<'a, C>) -> SuiResult { @@ -228,6 +256,10 @@ mod tests { use anyhow; use sui_types::base_types::TransactionDigest; + async fn recover_queue_empty(log: &DBWriteAheadLog) -> bool { + log.pop_one_recoverable_tx().await.is_none() + } + #[tokio::test] async fn test_write_ahead_log() -> Result<(), anyhow::Error> { let working_dir = tempfile::tempdir()?; @@ -238,8 +270,7 @@ mod tests { { let log: DBWriteAheadLog = DBWriteAheadLog::new(&working_dir); - let r = log.take_recoverable_txes(); - assert!(r.is_empty()); + assert!(recover_queue_empty(&log).await); let tx1 = log.begin_tx(&tx1_id, &1).await?.unwrap(); tx1.commit_tx().unwrap(); @@ -252,14 +283,12 @@ mod tests { // implicit drop } - let r = log.take_recoverable_txes(); + let r = log.pop_one_recoverable_tx().await.unwrap(); // tx3 in recoverable txes because we dropped the guard. - assert_eq!(r.len(), 1); - assert_eq!(r[0].tx_id(), tx3_id); + assert_eq!(r.tx_id(), tx3_id); // verify previous call emptied the recoverable list - let r_empty = log.take_recoverable_txes(); - assert!(r_empty.is_empty()); + assert!(recover_queue_empty(&log).await); } { @@ -267,23 +296,20 @@ mod tests { let log: DBWriteAheadLog = DBWriteAheadLog::new(&working_dir); // recoverable txes still there - let mut r = log.take_recoverable_txes(); - assert_eq!(r.len(), 1); - let g = r.pop().unwrap(); - assert_eq!(g.tx_id(), tx3_id); - - assert_eq!(log.get_tx_data(&g).unwrap(), 3); + let r = log.pop_one_recoverable_tx().await.unwrap(); + assert_eq!(r.tx_id(), tx3_id); + assert_eq!(log.get_tx_data(&r).unwrap(), 3); + assert!(recover_queue_empty(&log).await); // commit the recoverable tx - g.commit_tx().unwrap(); + r.commit_tx().unwrap(); } { // recover the log again let log: DBWriteAheadLog = DBWriteAheadLog::new(&working_dir); - let r = log.take_recoverable_txes(); // empty, because we committed the tx before. - assert!(r.is_empty()); + assert!(recover_queue_empty(&log).await); } Ok(())