Skip to content

Commit

Permalink
Cleanup mutex table periodically (MystenLabs#3872)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood authored Aug 11, 2022
1 parent 6173c3a commit 5ae5367
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 9 deletions.
187 changes: 181 additions & 6 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,94 @@

use std::collections::hash_map::{DefaultHasher, RandomState};
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tracing::{error, info};

type InnerLockTable<K> = HashMap<K, Arc<tokio::sync::Mutex<()>>>;
// MutexTable supports mutual exclusion on keys such as TransactionDigest or ObjectDigest
pub struct MutexTable<K: Hash> {
random_state: RandomState,
lock_table: Vec<RwLock<HashMap<K, Arc<tokio::sync::Mutex<()>>>>>,
lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>,
_k: std::marker::PhantomData<K>,
_cleaner: JoinHandle<()>,
stop: Arc<AtomicBool>,
}

#[derive(Debug)]
pub enum TryAcquireLockError {
LockTableLocked,
LockEntryLocked,
}

impl fmt::Display for TryAcquireLockError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "operation would block")
}
}

impl Error for TryAcquireLockError {}
// Opaque struct to hide tokio::sync::MutexGuard.
pub struct LockGuard(tokio::sync::OwnedMutexGuard<()>);

impl<'b, K: Hash + std::cmp::Eq + 'b> MutexTable<K> {
pub fn new(num_shards: usize, shard_size: usize) -> Self {
Self {
random_state: RandomState::new(),
lock_table: (0..num_shards)
impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
pub fn new_with_cleanup(
num_shards: usize,
shard_size: usize,
cleanup_period: Duration,
cleanup_initial_delay: Duration,
) -> Self {
let lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>> = Arc::new(
(0..num_shards)
.into_iter()
.map(|_| RwLock::new(HashMap::with_capacity(shard_size)))
.collect(),
);
let cloned = lock_table.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_cloned = stop.clone();
Self {
random_state: RandomState::new(),
lock_table,
_k: std::marker::PhantomData {},
_cleaner: tokio::spawn(async move {
tokio::time::sleep(cleanup_initial_delay).await;
while !stop_cloned.load(Ordering::SeqCst) {
Self::cleanup(cloned.clone());
tokio::time::sleep(cleanup_period).await;
}
info!("Stopping mutex table cleanup!");
}),
stop,
}
}

pub fn new(num_shards: usize, shard_size: usize) -> Self {
Self::new_with_cleanup(
num_shards,
shard_size,
Duration::from_secs(10),
Duration::from_secs(10),
)
}

pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) {
for shard in lock_table.iter() {
let map = shard.try_write();
if map.is_err() {
continue;
}
map.unwrap().retain(|_k, v| {
let mutex_guard = v.try_lock();
mutex_guard.is_err()
});
}
}

Expand Down Expand Up @@ -72,4 +136,115 @@ impl<'b, K: Hash + std::cmp::Eq + 'b> MutexTable<K> {
LockGuard(element.clone().lock_owned().await)
}
}

pub fn try_acquire_lock(&self, k: K) -> Result<LockGuard, TryAcquireLockError> {
let lock_idx = self.get_lock_idx(&k);
let res = self.lock_table[lock_idx].try_read();
if res.is_err() {
return Err(TryAcquireLockError::LockTableLocked);
}
let map = res.unwrap();
if let Some(element) = map.get(&k) {
let lock = element.clone().try_lock_owned();
if lock.is_err() {
return Err(TryAcquireLockError::LockEntryLocked);
}
Ok(LockGuard(lock.unwrap()))
} else {
// element doesn't exist
drop(map);
let res = self.lock_table[lock_idx].try_write();
if res.is_err() {
return Err(TryAcquireLockError::LockTableLocked);
}
let mut map = res.unwrap();
let element = map.entry(k).or_insert_with(|| Arc::new(Mutex::new(())));
let lock = element.clone().try_lock_owned();
lock.map(LockGuard).map_err(|e| {
error!("Failed to acquire lock after creation: {:?}", e);
TryAcquireLockError::LockEntryLocked
})
}
}
}

impl<K: Hash> Drop for MutexTable<K> {
fn drop(&mut self) {
self.stop.store(true, Ordering::SeqCst);
}
}

#[tokio::test]
async fn test_mutex_table() {
// Disable bg cleanup with Duration.MAX for initial delay
let mutex_table =
MutexTable::<String>::new_with_cleanup(1, 128, Duration::from_secs(10), Duration::MAX);
let john1 = mutex_table.try_acquire_lock("john".to_string());
assert!(john1.is_ok());
let john2 = mutex_table.try_acquire_lock("john".to_string());
assert!(john2.is_err());
drop(john1);
let john2 = mutex_table.try_acquire_lock("john".to_string());
assert!(john2.is_ok());
let jane = mutex_table.try_acquire_lock("jane".to_string());
assert!(jane.is_ok());
MutexTable::cleanup(mutex_table.lock_table.clone());
let map = mutex_table.lock_table.get(0).as_ref().unwrap().try_read();
assert!(map.is_ok());
assert_eq!(map.unwrap().len(), 2);
drop(john2);
MutexTable::cleanup(mutex_table.lock_table.clone());
let map = mutex_table.lock_table.get(0).as_ref().unwrap().try_read();
assert!(map.is_ok());
assert_eq!(map.unwrap().len(), 1);
drop(jane);
MutexTable::cleanup(mutex_table.lock_table.clone());
let map = mutex_table.lock_table.get(0).as_ref().unwrap().try_read();
assert!(map.is_ok());
assert!(map.unwrap().is_empty());
}

#[tokio::test]
async fn test_mutex_table_bg_cleanup() {
let mutex_table = MutexTable::<String>::new_with_cleanup(
1,
128,
Duration::from_secs(5),
Duration::from_secs(1),
);
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
let lock3 = mutex_table.try_acquire_lock("lock3".to_string());
let lock4 = mutex_table.try_acquire_lock("lock4".to_string());
let lock5 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock1.is_ok());
assert!(lock2.is_ok());
assert!(lock3.is_ok());
assert!(lock4.is_ok());
assert!(lock5.is_ok());
// Trigger cleanup
MutexTable::cleanup(mutex_table.lock_table.clone());
// Try acquiring locks again, these should still fail because locks have not been released
let lock11 = mutex_table.try_acquire_lock("lock1".to_string());
let lock22 = mutex_table.try_acquire_lock("lock2".to_string());
let lock33 = mutex_table.try_acquire_lock("lock3".to_string());
let lock44 = mutex_table.try_acquire_lock("lock4".to_string());
let lock55 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock11.is_err());
assert!(lock22.is_err());
assert!(lock33.is_err());
assert!(lock44.is_err());
assert!(lock55.is_err());
// drop all locks
drop(lock1);
drop(lock2);
drop(lock3);
drop(lock4);
drop(lock5);
// Wait for bg cleanup to be triggered
tokio::time::sleep(Duration::from_secs(10)).await;
for entry in mutex_table.lock_table.iter() {
let locked = entry.read().await;
assert!(locked.is_empty());
}
}
5 changes: 2 additions & 3 deletions crates/test-utils/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn increment_counter(
package_ref: ObjectRef,
counter_id: ObjectID,
) -> SuiTransactionResponse {
let resp = submit_move_transaction(
submit_move_transaction(
context,
"counter",
"increment",
Expand All @@ -160,8 +160,7 @@ pub async fn increment_counter(
sender,
gas_object,
)
.await;
resp
.await
}

/// Submit a certificate containing only owned-objects to all authorities.
Expand Down

0 comments on commit 5ae5367

Please sign in to comment.