Skip to content

Commit

Permalink
[store] feature: SledTree: add scan_prefix()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 11, 2021
1 parent f48ee64 commit 5552aae
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
23 changes: 23 additions & 0 deletions store/src/meta_service/sled_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,25 @@ impl SledTree {
Ok(res)
}

/// Get key-valuess in with the same prefix
pub fn scan_prefix<KV>(&self, prefix: &KV::K) -> common_exception::Result<Vec<(KV::K, KV::V)>>
where KV: SledKeySpace {
let mut res = vec![];

let mes = || format!("scan_prefix: {}", prefix);

let pref = KV::serialize_key(prefix)?;
for item in self.tree.scan_prefix(pref) {
let (k, v) = item.map_err_to_code(ErrorCode::MetaStoreDamaged, mes)?;

let key = KV::deserialize_key(k)?;
let value = KV::deserialize_value(v)?;
res.push((key, value));
}

Ok(res)
}

/// Get values of key in `range`
pub fn range_values<KV, R>(&self, range: R) -> common_exception::Result<Vec<KV::V>>
where
Expand Down Expand Up @@ -417,6 +436,10 @@ impl<'a, KV: SledKeySpace> AsKeySpace<'a, KV> {
self.inner.range::<KV, R>(range)
}

pub fn scan_prefix(&self, prefix: &KV::K) -> common_exception::Result<Vec<(KV::K, KV::V)>> {
self.inner.scan_prefix::<KV>(prefix)
}

pub fn range_values<R>(&self, range: R) -> common_exception::Result<Vec<KV::V>>
where R: RangeBounds<KV::K> {
self.inner.range_values::<KV, R>(range)
Expand Down
69 changes: 68 additions & 1 deletion store/src/meta_service/sled_tree_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_raft::raft::Entry;
use async_raft::raft::EntryNormal;
use async_raft::raft::EntryPayload;
use async_raft::LogId;
use common_metatypes::KVValue;
use common_runtime::tokio;

use crate::meta_service::sled_key_space;
Expand Down Expand Up @@ -248,6 +249,30 @@ async fn test_sledtree_range() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_sledtree_scan_prefix() -> anyhow::Result<()> {
init_store_unittest();

let tc = new_sled_test_context();
let db = &tc.db;
let tree = SledTree::open(db, tc.config.tree_name("foo"), true).await?;

let files: Vec<(String, String)> = vec![
("a".to_string(), "x".to_string()),
("ab".to_string(), "xy".to_string()),
("abc".to_string(), "xyz".to_string()),
("abd".to_string(), "xyZ".to_string()),
("b".to_string(), "y".to_string()),
];

tree.append::<sled_key_space::Files>(&files).await?;

let got = tree.scan_prefix::<sled_key_space::Files>(&"ab".to_string())?;
assert_eq!(files[1..4], got);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_sledtree_insert() -> anyhow::Result<()> {
init_store_unittest();
Expand Down Expand Up @@ -633,7 +658,7 @@ async fn test_sledtree_multi_types() -> anyhow::Result<()> {
Ok(())
}

// --- AsType test ---
// --- key space test ---

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_as_append() -> anyhow::Result<()> {
Expand Down Expand Up @@ -855,6 +880,48 @@ async fn test_as_range() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_as_scan_prefix() -> anyhow::Result<()> {
init_store_unittest();

let tc = new_sled_test_context();
let db = &tc.db;
let tree = SledTree::open(db, tc.config.tree_name("foo"), true).await?;
let file_tree = tree.key_space::<sled_key_space::Files>();
let kv_tree = tree.key_space::<sled_key_space::GenericKV>();

let files: Vec<(String, String)> = vec![
("a".to_string(), "x".to_string()),
("ab".to_string(), "xy".to_string()),
("abc".to_string(), "xyz".to_string()),
("abd".to_string(), "xyZ".to_string()),
("b".to_string(), "y".to_string()),
];

file_tree.append(&files).await?;

let kvs = vec![
("a".to_string(), (1, KVValue::default())),
("ab".to_string(), (2, KVValue::default())),
("b".to_string(), (3, KVValue::default())),
];

kv_tree.append(&kvs).await?;

let got = file_tree.scan_prefix(&"".to_string())?;
assert_eq!(files, got);

let got = file_tree.scan_prefix(&"ab".to_string())?;
assert_eq!(files[1..4], got);

let got = kv_tree.scan_prefix(&"".to_string())?;
assert_eq!(kvs, got);

let got = kv_tree.scan_prefix(&"ab".to_string())?;
assert_eq!(kvs[1..2], got);

Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_as_insert() -> anyhow::Result<()> {
init_store_unittest();
Expand Down
6 changes: 6 additions & 0 deletions store/src/meta_service/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,11 @@ impl StateMachine {
.into_iter()
.take_while(|(k, _)| k.starts_with(prefix));

// Convert expired to None
let x = x.map(|(k, v)| (k, Self::unexpired(v)));
// Remove None
let x = x.filter(|(_k, v)| v.is_some());
// Extract from an Option
let x = x.map(|(k, v)| (k, v.unwrap()));

x.collect()
Expand Down Expand Up @@ -792,9 +795,12 @@ impl StateMachine {
pub fn sm_meta(&self) -> AsKeySpace<StateMachineMeta> {
self.sm_tree.key_space()
}

pub fn nodes(&self) -> AsKeySpace<sled_key_space::Nodes> {
self.sm_tree.key_space()
}

/// The file names stored in this cluster
pub fn files(&self) -> AsKeySpace<sled_key_space::Files> {
self.sm_tree.key_space()
}
Expand Down

0 comments on commit 5552aae

Please sign in to comment.