Skip to content

Commit

Permalink
Fix false contention in mutex table (MystenLabs#3858)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Aug 10, 2022
1 parent a26d836 commit 018dcec
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 43 deletions.
18 changes: 7 additions & 11 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ pub type GatewayStore = SuiDataStore<EmptySignInfo>;

pub type InternalSequenceNumber = u64;

type CertLockGuard<'a> = LockGuard<'a>;
pub struct CertLockGuard(LockGuard);

const NUM_SHARDS: usize = 4096;
const SHARD_SIZE: usize = 128;

/// The key where the latest consensus index is stored in the database.
// TODO: Make a single table (e.g., called `variables`) storing all our lonely variables in one place.
Expand Down Expand Up @@ -90,10 +91,8 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

Self {
wal,

lock_service,
mutex_table: MutexTable::new(NUM_SHARDS),

mutex_table: MutexTable::new(NUM_SHARDS, SHARD_SIZE),
next_pending_seq,
pending_notifier: Arc::new(Notify::new()),
tables,
Expand Down Expand Up @@ -125,11 +124,8 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// Acquire the lock for a tx without writing to the WAL.
pub async fn acquire_tx_lock<'a, 'b>(
&'a self,
digest: &'b TransactionDigest,
) -> CertLockGuard<'a> {
self.wal.acquire_lock(digest).await
pub async fn acquire_tx_lock<'a, 'b>(&'a self, digest: &'b TransactionDigest) -> CertLockGuard {
CertLockGuard(self.wal.acquire_lock(digest).await)
}

// TODO: Async retry method, using tokio-retry crate.
Expand Down Expand Up @@ -271,9 +267,9 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
async fn acquire_locks<'a, 'b>(&'a self, input_objects: &'b [ObjectRef]) -> Vec<LockGuard<'a>> {
async fn acquire_locks<'a, 'b>(&'a self, input_objects: &'b [ObjectRef]) -> Vec<LockGuard> {
self.mutex_table
.acquire_locks(input_objects.iter().map(|(_, _, digest)| digest))
.acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
.await
}

Expand Down
51 changes: 30 additions & 21 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::{DefaultHasher, RandomState};
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::Arc;

use tokio::sync::{Mutex, RwLock};

// MutexTable supports mutual exclusion on keys such as TransactionDigest or ObjectDigest
pub struct MutexTable<K: Hash> {
random_state: RandomState,
lock_table: Vec<tokio::sync::Mutex<()>>,
lock_table: Vec<RwLock<HashMap<K, Arc<tokio::sync::Mutex<()>>>>>,
_k: std::marker::PhantomData<K>,
}

// Opaque struct to hide tokio::sync::MutexGuard.
pub struct LockGuard<'a>(tokio::sync::MutexGuard<'a, ()>);
pub struct LockGuard(tokio::sync::OwnedMutexGuard<()>);

impl<'b, K: Hash + 'b> MutexTable<K> {
pub fn new(size: usize) -> Self {
impl<'b, K: Hash + std::cmp::Eq + 'b> MutexTable<K> {
pub fn new(num_shards: usize, shard_size: usize) -> Self {
Self {
random_state: RandomState::new(),
lock_table: (0..size)
lock_table: (0..num_shards)
.into_iter()
.map(|_| tokio::sync::Mutex::new(()))
.map(|_| RwLock::new(HashMap::with_capacity(shard_size)))
.collect(),
_k: std::marker::PhantomData {},
}
Expand All @@ -40,27 +44,32 @@ impl<'b, K: Hash + 'b> MutexTable<K> {
hash % self.lock_table.len()
}

pub async fn acquire_locks<'a, I>(&'a self, object_iter: I) -> Vec<LockGuard<'a>>
pub async fn acquire_locks<I>(&self, object_iter: I) -> Vec<LockGuard>
where
I: IntoIterator<Item = &'b K>,
I: Iterator<Item = K>,
{
let mut locks: Vec<usize> = object_iter
.into_iter()
.map(|o| self.get_lock_idx(o))
.collect();

locks.sort_unstable();
locks.dedup();
let mut objects: Vec<K> = object_iter.into_iter().collect();
objects.sort_by_key(|a| self.get_lock_idx(a));
objects.dedup();

let mut guards = Vec::with_capacity(locks.len());
for lock_idx in locks {
guards.push(LockGuard(self.lock_table[lock_idx].lock().await));
let mut guards = Vec::with_capacity(objects.len());
for object in objects.into_iter() {
guards.push(self.acquire_lock(object).await);
}
guards
}

pub async fn acquire_lock<'a, 'key>(&'a self, k: &'key K) -> LockGuard<'a> {
let lock_idx = self.get_lock_idx(k);
LockGuard(self.lock_table[lock_idx].lock().await)
pub async fn acquire_lock(&self, k: K) -> LockGuard {
let lock_idx = self.get_lock_idx(&k);
let map = self.lock_table[lock_idx].read().await;
if let Some(element) = map.get(&k) {
LockGuard(element.clone().lock_owned().await)
} else {
// element doesn't exist
drop(map);
let mut map = self.lock_table[lock_idx].write().await;
let element = map.entry(k).or_insert_with(|| Arc::new(Mutex::new(())));
LockGuard(element.clone().lock_owned().await)
}
}
}
19 changes: 8 additions & 11 deletions crates/sui-storage/src/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub trait WriteAheadLog<'a, C> {

pub struct DBTxGuard<'a, C: Serialize + DeserializeOwned + Debug> {
tx: TransactionDigest,
_mutex_guard: LockGuard<'a>,
_mutex_guard: LockGuard,
wal: &'a DBWriteAheadLog<C>,
dead: bool,
}
Expand All @@ -101,11 +101,7 @@ impl<'a, C> DBTxGuard<'a, C>
where
C: Serialize + DeserializeOwned + Debug,
{
fn new(
tx: &TransactionDigest,
_mutex_guard: LockGuard<'a>,
wal: &'a DBWriteAheadLog<C>,
) -> Self {
fn new(tx: &TransactionDigest, _mutex_guard: LockGuard, wal: &'a DBWriteAheadLog<C>) -> Self {
Self {
tx: *tx,
_mutex_guard,
Expand Down Expand Up @@ -173,6 +169,7 @@ pub struct DBWriteAheadLog<C> {
}

const MUTEX_TABLE_SIZE: usize = 1024;
const MUTEX_TABLE_SHARD_SIZE: usize = 128;

impl<C> DBWriteAheadLog<C>
where
Expand All @@ -193,7 +190,7 @@ where
Self {
tables,
recoverable_txes: Mutex::new(recoverable_txes),
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE),
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE, MUTEX_TABLE_SHARD_SIZE),
}
}

Expand Down Expand Up @@ -251,7 +248,7 @@ where
C: Serialize + DeserializeOwned + std::marker::Send + std::marker::Sync + Debug,
{
type Guard = DBTxGuard<'a, C>;
type LockGuard = LockGuard<'a>;
type LockGuard = LockGuard;

#[must_use]
#[instrument(level = "debug", name = "begin_tx", skip_all)]
Expand All @@ -260,7 +257,7 @@ where
tx: &'b TransactionDigest,
cert: &'b C,
) -> SuiResult<Option<DBTxGuard<'a, C>>> {
let mutex_guard = self.mutex_table.acquire_lock(tx).await;
let mutex_guard = self.mutex_table.acquire_lock(*tx).await;
trace!(digest = ?tx, "acquired tx lock");

if self.tables.log.contains_key(tx)? {
Expand All @@ -281,7 +278,7 @@ where

#[must_use]
async fn acquire_lock(&'a self, tx: &TransactionDigest) -> Self::LockGuard {
let res = self.mutex_table.acquire_lock(tx).await;
let res = self.mutex_table.acquire_lock(*tx).await;
trace!(digest = ?tx, "acquired tx lock");
res
}
Expand All @@ -293,7 +290,7 @@ where
match candidate {
None => None,
Some(digest) => {
let guard = self.mutex_table.acquire_lock(&digest).await;
let guard = self.mutex_table.acquire_lock(digest).await;
Some(DBTxGuard::new(&digest, guard, self))
}
}
Expand Down

0 comments on commit 018dcec

Please sign in to comment.