Skip to content

Commit

Permalink
Factor lock_table out into a new module. (MystenLabs#2394)
Browse files Browse the repository at this point in the history
* Factor lock_table out into a new module.

- Call it MutexTable to reduce confusion with distributed locks

* Address PR comments

* lint fix
  • Loading branch information
mystenmark authored Jun 3, 2022
1 parent 394a352 commit 245a690
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 11 additions & 26 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use narwhal_executor::ExecutionIndices;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::BTreeSet;
use std::path::Path;
use sui_storage::{default_db_options, LockService};
use sui_storage::{
default_db_options,
mutex_table::{LockGuard, MutexTable},
LockService,
};
use sui_types::base_types::SequenceNumber;
use sui_types::batch::{SignedBatch, TxSequenceNumber};
use sui_types::committee::EpochId;
Expand Down Expand Up @@ -56,7 +59,7 @@ pub struct SuiDataStore<const ALL_OBJ_VER: bool, const USE_LOCKS: bool, S> {
lock_service: LockService,

/// Internal vector of locks to manage concurrent writes to the database
lock_table: Vec<tokio::sync::Mutex<()>>,
mutex_table: MutexTable<ObjectDigest>,

/// This is a an index of object references to currently existing objects, indexed by the
/// composite key of the SuiAddress of their owner and the object ID of the object.
Expand Down Expand Up @@ -186,10 +189,7 @@ impl<
objects,
all_object_versions,
lock_service,
lock_table: (0..NUM_SHARDS)
.into_iter()
.map(|_| tokio::sync::Mutex::new(()))
.collect(),
mutex_table: MutexTable::new(NUM_SHARDS),
owner_index,
transactions,
certificates,
Expand Down Expand Up @@ -257,29 +257,14 @@ impl<
}

/// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
async fn acquire_locks<'a, 'b>(
&'a self,
input_objects: &'b [ObjectRef],
) -> Vec<tokio::sync::MutexGuard<'a, ()>> {
async fn acquire_locks<'a, 'b>(&'a self, input_objects: &'b [ObjectRef]) -> Vec<LockGuard<'a>> {
if !USE_LOCKS {
return vec![];
}

let num_locks = self.lock_table.len();
// TODO: randomize the lock mapping based on a secret to avoid DoS attacks.
let lock_numbers: BTreeSet<usize> = input_objects
.iter()
.map(|(_, _, digest)| {
usize::from_le_bytes(digest.0[0..8].try_into().unwrap()) % num_locks
})
.collect();
// Note: we need to iterate over the sorted unique elements, hence the use of a Set
// in order to prevent deadlocks when trying to acquire many locks.
let mut locks = Vec::new();
for lock_seq in lock_numbers {
locks.push(self.lock_table[lock_seq].lock().await);
}
locks
self.mutex_table
.acquire_locks(input_objects.iter().map(|(_, _, digest)| digest))
.await
}

// Methods to read the store
Expand Down
1 change: 1 addition & 0 deletions crates/sui-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0.55"
fdlimit = "0.2.1"
futures = "0.3.21"
flexstr = "^0.9"
rand = "0.7.3"
serde = { version = "1.0.136", features = ["derive"] }
tokio = { version = "1.17.0", features = ["full", "tracing"] }
rocksdb = "0.18.0"
Expand Down
1 change: 1 addition & 0 deletions crates/sui-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod indexes;
pub use indexes::IndexStore;

pub mod event_store;
pub mod mutex_table;

use rocksdb::Options;

Expand Down
72 changes: 72 additions & 0 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::{DefaultHasher, RandomState};
use std::hash::{BuildHasher, Hash, Hasher};

// MutexTable supports mutual exclusion on keys such as TransactionDigest or ObjectDigest
pub struct MutexTable<K: Hash> {
random_state: RandomState,
lock_table: Vec<tokio::sync::Mutex<()>>,
_k: std::marker::PhantomData<K>,
}

// Opaque struct to hide tokio::sync::MutexGuard.
pub struct LockGuard<'a> {
_guard: tokio::sync::MutexGuard<'a, ()>,
}

impl<'b, K: Hash + 'b> MutexTable<K> {
pub fn new(size: usize) -> Self {
Self {
random_state: RandomState::new(),
lock_table: (0..size)
.into_iter()
.map(|_| tokio::sync::Mutex::new(()))
.collect(),
_k: std::marker::PhantomData {},
}
}

fn get_lock_idx(&self, key: &K) -> usize {
let mut hasher = if !cfg!(test) {
self.random_state.build_hasher()
} else {
// be deterministic for tests
DefaultHasher::new()
};

key.hash(&mut hasher);
// unwrap ok - converting u64 -> usize
let hash: usize = hasher.finish().try_into().unwrap();
hash % self.lock_table.len()
}

pub async fn acquire_locks<'a, I>(&'a self, object_iter: I) -> Vec<LockGuard<'a>>
where
I: IntoIterator<Item = &'b K>,
{
let mut locks: Vec<usize> = object_iter
.into_iter()
.map(|o| self.get_lock_idx(o))
.collect();

locks.sort_unstable();
locks.dedup();

let mut guards = Vec::with_capacity(locks.len());
for lock_idx in locks {
guards.push(LockGuard {
_guard: self.lock_table[lock_idx].lock().await,
});
}
guards
}

pub async fn acquire_lock<'a>(&'a self, k: &K) -> LockGuard<'a> {
let lock_idx = self.get_lock_idx(k);
LockGuard {
_guard: self.lock_table[lock_idx].lock().await,
}
}
}

0 comments on commit 245a690

Please sign in to comment.