Skip to content

Commit

Permalink
[sally-api] Add missing sally iter functions (MystenLabs#8094)
Browse files Browse the repository at this point in the history
Adds the missing iter functions which are needed for migrating perpetual
db
  • Loading branch information
sadhansood authored Feb 15, 2023
1 parent 7a661d1 commit bf641f7
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 9 deletions.
63 changes: 62 additions & 1 deletion crates/typed-store/src/sally/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ use crate::{
traits::{AsyncMap, Map},
};

use crate::rocks::iter::Iter as RocksDBIter;
use crate::rocks::iter::{Iter as RocksDBIter, RevIter};
use crate::rocks::{DBMapTableConfigMap, MetricConf};
use crate::test_db::TestDBRevIter;
use async_trait::async_trait;
use collectable::TryExtend;
use rocksdb::Options;
Expand Down Expand Up @@ -415,6 +416,66 @@ impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyIter<'a, K,
}
}

impl<'a, K: Serialize, V> SallyIter<'a, K, V> {
/// Skips all the elements that are smaller than the given key,
/// and either lands on the key or the first one greater than
/// the key.
pub fn skip_to(self, key: &K) -> Result<Self, TypedStoreError> {
let iter = match self {
SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to(key)?),
SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to(key)?),
};
Ok(iter)
}

/// Moves the iterator the element given or
/// the one prior to it if it does not exist. If there is
/// no element prior to it, it returns an empty iterator.
pub fn skip_prior_to(self, key: &K) -> Result<Self, TypedStoreError> {
let iter = match self {
SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_prior_to(key)?),
SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_prior_to(key)?),
};
Ok(iter)
}

/// Seeks to the last key in the database (at this column family).
pub fn skip_to_last(self) -> Self {
match self {
SallyIter::RocksDB(iter) => SallyIter::RocksDB(iter.skip_to_last()),
SallyIter::TestDB(iter) => SallyIter::TestDB(iter.skip_to_last()),
}
}

/// Will make the direction of the iteration reverse and will
/// create a new `RevIter` to consume. Every call to `next` method
/// will give the next element from the end.
pub fn reverse(self) -> SallyRevIter<'a, K, V> {
match self {
SallyIter::RocksDB(iter) => SallyRevIter::RocksDB(iter.reverse()),
SallyIter::TestDB(iter) => SallyRevIter::TestDB(iter.reverse()),
}
}
}

pub enum SallyRevIter<'a, K, V> {
// Iter for a rocksdb backed sally column when `fallback_to_db` is true
RocksDB(RevIter<'a, K, V>),
TestDB(TestDBRevIter<'a, K, V>),
}

impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iterator for SallyRevIter<'a, K, V> {
type Item = (K, V);

/// Will give the next item backwards
fn next(&mut self) -> Option<Self::Item> {
match self {
SallyRevIter::RocksDB(rev_iter) => rev_iter.next(),
SallyRevIter::TestDB(rev_iter) => rev_iter.next(),
}
}
}

/// A SallyIter provides an iterator over all keys in a sally column
pub enum SallyKeys<'a, K> {
// Iter for a rocksdb backed sally column when `fallback_to_db` is true
Expand Down
108 changes: 100 additions & 8 deletions crates/typed-store/src/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bincode::Options;
use collectable::TryExtend;
use ouroboros::self_referencing;
use rand::distributions::{Alphanumeric, DistString};
use rocksdb::Direction;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::{RwLockReadGuard, RwLockWriteGuard};

Expand All @@ -42,30 +43,31 @@ impl<K, V> TestDB<K, V> {
}
}

#[self_referencing]
#[self_referencing(pub_extras)]
pub struct TestDBIter<'a, K, V> {
rows: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
pub rows: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
#[borrows(mut rows)]
#[covariant]
iter: Iter<'this, Vec<u8>, Vec<u8>>,
pub iter: Iter<'this, Vec<u8>, Vec<u8>>,
phantom: PhantomData<(K, V)>,
pub direction: Direction,
}

#[self_referencing]
#[self_referencing(pub_extras)]
pub struct TestDBKeys<'a, K> {
rows: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
#[borrows(mut rows)]
#[covariant]
iter: Iter<'this, Vec<u8>, Vec<u8>>,
pub iter: Iter<'this, Vec<u8>, Vec<u8>>,
phantom: PhantomData<K>,
}

#[self_referencing]
#[self_referencing(pub_extras)]
pub struct TestDBValues<'a, V> {
rows: RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>,
#[borrows(mut rows)]
#[covariant]
iter: Iter<'this, Vec<u8>, Vec<u8>>,
pub iter: Iter<'this, Vec<u8>, Vec<u8>>,
phantom: PhantomData<V>,
}

Expand All @@ -78,7 +80,11 @@ impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iterator for TestDBIter<'a, K
.with_big_endian()
.with_fixint_encoding();
self.with_mut(|fields| {
if let Some((raw_key, raw_value)) = fields.iter.next() {
let resp = match fields.direction {
Direction::Forward => fields.iter.next(),
Direction::Reverse => panic!("Reverse iteration not supported in test db"),
};
if let Some((raw_key, raw_value)) = resp {
let key: K = config.deserialize(raw_key).ok().unwrap();
let value: V = bincode::deserialize(raw_value).ok().unwrap();
out = Some((key, value));
Expand All @@ -88,6 +94,91 @@ impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iterator for TestDBIter<'a, K
}
}

impl<'a, K: Serialize, V> TestDBIter<'a, K, V> {
/// Skips all the elements that are smaller than the given key,
/// and either lands on the key or the first one greater than
/// the key.
pub fn skip_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
self.with_mut(|fields| {
let serialized_key = be_fix_int_ser(key).expect("serialization failed");
let mut peekable = fields.iter.peekable();
let mut peeked = peekable.peek();
while peeked.is_some() {
let serialized = be_fix_int_ser(peeked.unwrap()).expect("serialization failed");
if serialized >= serialized_key {
break;
} else {
peekable.next();
peeked = peekable.peek();
}
}
});
Ok(self)
}

/// Moves the iterator to the element given or
/// the one prior to it if it does not exist. If there is
/// no element prior to it, it returns an empty iterator.
pub fn skip_prior_to(mut self, key: &K) -> Result<Self, TypedStoreError> {
self.with_mut(|fields| {
let serialized_key = be_fix_int_ser(key).expect("serialization failed");
let mut peekable = fields.iter.peekable();
let mut peeked = peekable.peek();
while peeked.is_some() {
let serialized = be_fix_int_ser(peeked.unwrap()).expect("serialization failed");
if serialized > serialized_key {
break;
} else {
peekable.next();
peeked = peekable.peek();
}
}
});
Ok(self)
}

/// Seeks to the last key in the database (at this column family).
pub fn skip_to_last(mut self) -> Self {
self.with_mut(|fields| {
fields.iter.last();
});
self
}

/// Will make the direction of the iteration reverse and will
/// create a new `RevIter` to consume. Every call to `next` method
/// will give the next element from the end.
pub fn reverse(mut self) -> TestDBRevIter<'a, K, V> {
self.with_mut(|fields| {
*fields.direction = Direction::Reverse;
});
TestDBRevIter::new(self)
}
}

/// An iterator with a reverted direction to the original. The `RevIter`
/// is hosting an iteration which is consuming in the opposing direction.
/// It's not possible to do further manipulation (ex re-reverse) to the
/// iterator.
pub struct TestDBRevIter<'a, K, V> {
iter: TestDBIter<'a, K, V>,
}

impl<'a, K, V> TestDBRevIter<'a, K, V> {
fn new(iter: TestDBIter<'a, K, V>) -> Self {
Self { iter }
}
}

impl<'a, K: DeserializeOwned, V: DeserializeOwned> Iterator for TestDBRevIter<'a, K, V> {
type Item = (K, V);

/// Will give the next item backwards
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

impl<'a, K: DeserializeOwned> Iterator for TestDBKeys<'a, K> {
type Item = K;

Expand Down Expand Up @@ -182,6 +273,7 @@ where
rows: self.rows.read().unwrap(),
iter_builder: |rows: &mut RwLockReadGuard<'a, BTreeMap<Vec<u8>, Vec<u8>>>| rows.iter(),
phantom: PhantomData,
direction: Direction::Forward,
}
.build()
}
Expand Down

0 comments on commit bf641f7

Please sign in to comment.