Skip to content

Commit

Permalink
[rocksdb] introduce safe iterator (MystenLabs#10203)
Browse files Browse the repository at this point in the history
currently there's a bug in the way we use iterators in the codebase
we check iterators for
[validity](https://github.com/MystenLabs/sui/blob/main/crates/typed-store/src/rocks/iter.rs#L45),
but do not check for potential
[errors](https://github.com/rust-rocksdb/rust-rocksdb/blob/master/src/db_iterator.rs#L132).
So we can't really differentiate if the iterator stopped because it was
fully consumed or there was an error.

This PR introduces a new temporary method `safe_iter`. It's similar to
the existing `iter` method but returns an iterator of results instead.
We use iterators a lot, so migration of call sites will be split into a
few batches. Once migration is done, `safe_iter` will be renamed back to
`iter`
 
Notes:
* `.keys` and `.values` methods are fully migrated
* `typed_store` internal call sites (including sally) and tests are
migrated
*  `typed_store::Store` struct is deprecated and removed
  • Loading branch information
phoenix-o authored Apr 3, 2023
1 parent 773a7be commit e22fa94
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 768 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,7 @@ impl AuthorityState {
Self::choose_protocol_version_and_system_packages(
epoch_store.protocol_version(),
epoch_store.committee(),
epoch_store.get_capabilities(),
epoch_store.get_capabilities()?,
buffer_stake_bps,
);

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ impl AuthorityPerEpochStore {
Ok(())
}

pub fn get_capabilities(&self) -> Vec<AuthorityCapabilities> {
pub fn get_capabilities(&self) -> Result<Vec<AuthorityCapabilities>, TypedStoreError> {
self.tables.authority_capabilities.values().collect()
}

Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,7 @@ impl CheckpointServiceNotify for CheckpointService {
.keys()
.skip_to_last()
.next()
.transpose()?
{
if sequence <= last_certified {
debug!(
Expand Down
11 changes: 2 additions & 9 deletions crates/typed-store-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ fn extract_generics_names(generics: &Generics) -> Vec<Ident> {
/// use typed_store::rocks::DBOptions;
/// use typed_store::rocks::DBMap;
/// use typed_store::rocks::MetricConf;
/// use typed_store::Store;
/// use typed_store_derive::DBMapUtils;
/// use typed_store::traits::TypedStoreDebug;
/// use typed_store::traits::TableSummary;
Expand Down Expand Up @@ -228,7 +227,6 @@ fn extract_generics_names(generics: &Generics) -> Vec<Ident> {
///```
/// use typed_store::rocks::DBOptions;
/// use typed_store::rocks::DBMap;
/// use typed_store::Store;
/// use typed_store_derive::DBMapUtils;
/// use typed_store::traits::TypedStoreDebug;
/// use core::fmt::Error;
Expand Down Expand Up @@ -289,13 +287,8 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
let generics = &input.generics;
let generics_names = extract_generics_names(generics);

let allowed_types_with_post_process_fn: BTreeMap<_, _> = [
("SallyColumn", ""),
("DBMap", ""),
("Store", "typed_store::Store::new"),
]
.into_iter()
.collect();
let allowed_types_with_post_process_fn: BTreeMap<_, _> =
[("SallyColumn", ""), ("DBMap", "")].into_iter().collect();
let allowed_strs = allowed_types_with_post_process_fn
.keys()
.map(|s| s.to_string())
Expand Down
297 changes: 0 additions & 297 deletions crates/typed-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,310 +8,13 @@
rust_2021_compatibility
)]

use eyre::Result;
use serde::{de::DeserializeOwned, Serialize};
use std::{
cmp::Eq,
collections::{HashMap, VecDeque},
hash::Hash,
sync::Arc,
};
use tokio::sync::{
mpsc::{channel, Sender},
oneshot,
};

pub mod traits;
pub use traits::Map;
pub mod metrics;
pub mod rocks;
use crate::rocks::RocksDB;
pub use rocks::TypedStoreError;
pub mod sally;
pub mod test_db;
pub use metrics::DBMetrics;

#[cfg(test)]
#[path = "tests/store_tests.rs"]
pub mod store_tests;

pub type StoreError = rocks::TypedStoreError;

type StoreResult<T> = Result<T, StoreError>;
type IterPredicate<Key, Value> = dyn Fn(&(Key, Value)) -> bool + Send;

pub enum StoreCommand<Key, Value> {
Write(Key, Value, Option<oneshot::Sender<StoreResult<()>>>),
WriteAll(Vec<(Key, Value)>, oneshot::Sender<StoreResult<()>>),
Delete(Key),
DeleteAll(Vec<Key>, oneshot::Sender<StoreResult<()>>),
Read(Key, oneshot::Sender<StoreResult<Option<Value>>>),
ReadRawBytes(Key, oneshot::Sender<StoreResult<Option<Vec<u8>>>>),
ReadAll(Vec<Key>, oneshot::Sender<StoreResult<Vec<Option<Value>>>>),
NotifyRead(Key, oneshot::Sender<StoreResult<Option<Value>>>),
Iter(
Option<Box<IterPredicate<Key, Value>>>,
oneshot::Sender<HashMap<Key, Value>>,
),
}

#[derive(Clone)]
pub struct Store<K, V> {
channel: Sender<StoreCommand<K, V>>,
pub rocksdb: Arc<RocksDB>,
}

impl<Key, Value> Store<Key, Value>
where
Key: Hash + Eq + Serialize + DeserializeOwned + Send + 'static,
Value: Serialize + DeserializeOwned + Send + Clone + 'static,
{
pub fn new(keyed_db: rocks::DBMap<Key, Value>) -> Self {
let mut obligations = HashMap::<Key, VecDeque<oneshot::Sender<_>>>::new();
let clone_db = keyed_db.rocksdb.clone();
let (tx, mut rx) = channel(100);
tokio::spawn(async move {
while let Some(command) = rx.recv().await {
match command {
StoreCommand::Write(key, value, sender) => {
let response = keyed_db.insert(&key, &value);
if response.is_ok() {
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(Some(value.clone())));
}
}
}
if let Some(replier) = sender {
let _ = replier.send(response);
}
}
StoreCommand::WriteAll(key_values, sender) => {
let response =
keyed_db.multi_insert(key_values.iter().map(|(k, v)| (k, v)));

if response.is_ok() {
for (key, _) in key_values {
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(None));
}
}
}
}
let _ = sender.send(response);
}
StoreCommand::Delete(key) => {
let _ = keyed_db.remove(&key);
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(None));
}
}
}
StoreCommand::DeleteAll(keys, sender) => {
let response = keyed_db.multi_remove(keys.iter());
// notify the obligations only when the delete was successful
if response.is_ok() {
for key in keys {
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(None));
}
}
}
}
let _ = sender.send(response);
}
StoreCommand::Read(key, sender) => {
let response = keyed_db.get(&key);
let _ = sender.send(response);
}
StoreCommand::ReadAll(keys, sender) => {
let response = keyed_db.multi_get(keys.as_slice());
let _ = sender.send(response);
}
StoreCommand::NotifyRead(key, sender) => {
let response = keyed_db.get(&key);
if let Ok(Some(_)) = response {
let _ = sender.send(response);
} else {
obligations
.entry(key)
.or_insert_with(VecDeque::new)
.push_back(sender)
}
}
StoreCommand::Iter(predicate, sender) => {
let response = if let Some(func) = predicate {
keyed_db.iter().filter(func).collect()
} else {
// Beware, we may overload the memory with a large table!
keyed_db.iter().collect()
};

let _ = sender.send(response);
}
StoreCommand::ReadRawBytes(key, sender) => {
let response = keyed_db.get_raw_bytes(&key);
let _ = sender.send(response);
}
}
}
});
Self {
channel: tx,
rocksdb: clone_db,
}
}
}

impl<Key, Value> Store<Key, Value>
where
Key: Serialize + DeserializeOwned + Send,
Value: Serialize + DeserializeOwned + Send,
{
pub async fn async_write(&self, key: Key, value: Value) {
if let Err(e) = self
.channel
.send(StoreCommand::Write(key, value, None))
.await
{
panic!("Failed to send Write command to store: {e}");
}
}

pub async fn sync_write(&self, key: Key, value: Value) -> StoreResult<()> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::Write(key, value, Some(sender)))
.await
{
panic!("Failed to send Write command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to Write command from store")
}

/// Atomically writes all the key-value pairs in storage.
/// If the operation is successful, then the result will be a non
/// error empty result. Otherwise the error is returned.
pub async fn sync_write_all(
&self,
key_value_pairs: impl IntoIterator<Item = (Key, Value)>,
) -> StoreResult<()> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::WriteAll(
key_value_pairs.into_iter().collect(),
sender,
))
.await
{
panic!("Failed to send WriteAll command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to WriteAll command from store")
}

pub async fn remove(&self, key: Key) {
if let Err(e) = self.channel.send(StoreCommand::Delete(key)).await {
panic!("Failed to send Delete command to store: {e}");
}
}

/// Atomically removes all the data referenced by the provided keys.
/// If the operation is successful, then the result will be a non
/// error empty result. Otherwise the error is returned.
pub async fn remove_all(&self, keys: impl IntoIterator<Item = Key>) -> StoreResult<()> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::DeleteAll(keys.into_iter().collect(), sender))
.await
{
panic!("Failed to send DeleteAll command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to RemoveAll command from store")
}

/// Returns the read value in raw bytes
pub async fn read_raw_bytes(&self, key: Key) -> StoreResult<Option<Vec<u8>>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::ReadRawBytes(key, sender))
.await
{
panic!("Failed to send ReadRawBytes command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to ReadRawBytes command from store")
}

pub async fn read(&self, key: Key) -> StoreResult<Option<Value>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self.channel.send(StoreCommand::Read(key, sender)).await {
panic!("Failed to send Read command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to Read command from store")
}

/// Fetches all the values for the provided keys.
pub async fn read_all(
&self,
keys: impl IntoIterator<Item = Key>,
) -> StoreResult<Vec<Option<Value>>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::ReadAll(keys.into_iter().collect(), sender))
.await
{
panic!("Failed to send ReadAll command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to ReadAll command from store")
}

pub async fn notify_read(&self, key: Key) -> StoreResult<Option<Value>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::NotifyRead(key, sender))
.await
{
panic!("Failed to send NotifyRead command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to NotifyRead command from store")
}

pub async fn iter(
&self,
predicate: Option<Box<IterPredicate<Key, Value>>>,
) -> HashMap<Key, Value> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::Iter(predicate, sender))
.await
{
panic!("Failed to send Iter command to store: {e}");
}
receiver
.await
.expect("Failed to receive reply to Iter command from store")
}
}
10 changes: 6 additions & 4 deletions crates/typed-store/src/rocks/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ impl<'a, K: DeserializeOwned> Keys<'a, K> {
}

impl<'a, K: DeserializeOwned> Iterator for Keys<'a, K> {
type Item = K;
type Item = Result<K, TypedStoreError>;

fn next(&mut self) -> Option<Self::Item> {
if self.db_iter.valid() {
let config = bincode::DefaultOptions::new()
.with_big_endian()
.with_fixint_encoding();
let key = self.db_iter.key().and_then(|k| config.deserialize(k).ok());

self.db_iter.next();
key
key.map(Ok)
} else {
None
match self.db_iter.status() {
Ok(_) => None,
Err(err) => Some(Err(TypedStoreError::RocksDBError(format!("{err}")))),
}
}
}
}
Expand Down
Loading

0 comments on commit e22fa94

Please sign in to comment.