Skip to content

Commit

Permalink
[common/meta/sled-store] feature: txn-tree: add update_and_fetch()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Dec 9, 2021
1 parent 0b44dca commit b1a9b70
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 37 deletions.
36 changes: 36 additions & 0 deletions common/meta/sled-store/src/sled_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,31 @@ impl TransactionSledTree<'_> {

Ok(removed)
}

pub fn update_and_fetch<KV, F>(
&self,
key: &KV::K,
mut f: F,
) -> Result<Option<KV::V>, UnabortableTransactionError>
where
KV: SledKeySpace,
F: FnMut(Option<KV::V>) -> Option<KV::V>,
{
let key_ivec = KV::serialize_key(key).unwrap();

let old_val_ivec = self.txn_tree.get(&key_ivec)?;
let old_val = old_val_ivec.map(|o| KV::deserialize_value(o).unwrap());

let new_val = f(old_val);
let _ = match new_val {
Some(ref v) => self
.txn_tree
.insert(key_ivec, KV::serialize_value(v).unwrap())?,
None => self.txn_tree.remove(key_ivec)?,
};

Ok(new_val)
}
}

/// It borrows the internal SledTree with access limited to a specified namespace `KV`.
Expand Down Expand Up @@ -565,6 +590,17 @@ impl<'a, KV: SledKeySpace> AsTxnKeySpace<'a, KV> {
pub fn remove(&self, key: &KV::K) -> Result<Option<KV::V>, UnabortableTransactionError> {
self.inner.remove::<KV>(key)
}

pub fn update_and_fetch<F>(
&self,
key: &KV::K,
f: F,
) -> Result<Option<KV::V>, UnabortableTransactionError>
where
F: FnMut(Option<KV::V>) -> Option<KV::V>,
{
self.inner.update_and_fetch::<KV, _>(key, f)
}
}

/// Some methods that take `&TransactionSledTree` as parameter need to be called
Expand Down
1 change: 1 addition & 0 deletions common/meta/sled-store/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.

mod sled_tree;
mod sled_txn_tree;
mod testing;
40 changes: 3 additions & 37 deletions common/meta/sled-store/tests/it/sled_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ use async_raft::raft::Entry;
use async_raft::raft::EntryNormal;
use async_raft::raft::EntryPayload;
use common_base::tokio;
use common_base::GlobalSequence;
use common_meta_sled_store::get_sled_db;
use common_meta_sled_store::SledTree;
use common_meta_types::Cmd;
use common_meta_types::LogEntry;
use common_meta_types::LogId;
use common_meta_types::LogIndex;
use common_meta_types::SeqV;
use testing::new_sled_test_context;

use crate::init_sled_ut;
use crate::testing;
use crate::testing::fake_key_spaces::Files;
use crate::testing::fake_key_spaces::GenericKV;
use crate::testing::fake_key_spaces::Logs;
Expand All @@ -34,24 +35,6 @@ use crate::testing::fake_state_machine_meta::StateMachineMetaKey::Initialized;
use crate::testing::fake_state_machine_meta::StateMachineMetaKey::LastApplied;
use crate::testing::fake_state_machine_meta::StateMachineMetaValue;

/// 1. Open a temp sled::Db for all tests.
/// 2. Initialize a global tracing.
/// 3. Create a span for a test case. One needs to enter it by `span.enter()` and keeps the guard held.
#[macro_export]
macro_rules! init_sled_ut {
() => {{
let t = tempfile::tempdir().expect("create temp dir to sled db");

common_meta_sled_store::init_temp_sled_db(t);
common_tracing::init_default_ut_tracing();

let name = common_tracing::func_name!();
let span =
common_tracing::tracing::debug_span!("ut", "{}", name.split("::").last().unwrap());
((), span)
}};
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_sled_tree_open() -> anyhow::Result<()> {
let (_log_guards, ut_span) = init_sled_ut!();
Expand Down Expand Up @@ -1428,20 +1411,3 @@ async fn test_as_multi_types() -> anyhow::Result<()> {

Ok(())
}

pub struct SledTestContext {
pub tree_name: String,
pub db: sled::Db,
}

/// Create a new context for testing sled
pub fn new_sled_test_context() -> SledTestContext {
SledTestContext {
tree_name: format!("test-{}-", next_port()),
db: get_sled_db(),
}
}

pub fn next_port() -> u32 {
29000u32 + (GlobalSequence::next() as u32)
}
96 changes: 96 additions & 0 deletions common/meta/sled-store/tests/it/sled_txn_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_base::tokio;
use common_meta_sled_store::SledTree;
use common_meta_types::Node;

use crate::init_sled_ut;
use crate::testing::fake_key_spaces::Nodes;
use crate::testing::new_sled_test_context;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_sled_txn_tree_update_and_fetch() -> anyhow::Result<()> {
// Test transactional API update_and_fetch on TransactionSledTree

let (_log_guards, ut_span) = init_sled_ut!();
let _ent = ut_span.enter();

let tc = new_sled_test_context();
let db = &tc.db;
let tree = SledTree::open(db, tc.tree_name, true)?;
tree.txn(false, |txn_tree| {
let k = 100;

for _i in 0..3 {
txn_tree.update_and_fetch::<Nodes, _>(&k, |old| match old {
Some(v) => Some(Node {
name: v.name + "a",
address: v.address,
}),
None => Some(Node::default()),
})?;
}

Ok(())
})?;

let got = tree.get::<Nodes>(&100)?.unwrap();
assert_eq!(
"aa".to_string(),
got.name,
"1st time create a default. then append 2 'a'"
);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_sled_txn_tree_key_space_update_and_fetch() -> anyhow::Result<()> {
// Test transactional API update_and_fetch on a sub key space of TransactionSledTree

let (_log_guards, ut_span) = init_sled_ut!();
let _ent = ut_span.enter();

let tc = new_sled_test_context();
let db = &tc.db;
let tree = SledTree::open(db, tc.tree_name, true)?;
tree.txn(false, |txn_tree| {
let k = 100;

// sub tree key space
let nodes_ks = txn_tree.key_space::<Nodes>();

for _i in 0..3 {
nodes_ks.update_and_fetch(&k, |old| match old {
Some(v) => Some(Node {
name: v.name + "a",
address: v.address,
}),
None => Some(Node::default()),
})?;
}

Ok(())
})?;

let got = tree.get::<Nodes>(&100)?.unwrap();
assert_eq!(
"aa".to_string(),
got.name,
"1st time create a default. then append 2 'a'"
);

Ok(())
}
38 changes: 38 additions & 0 deletions common/meta/sled-store/tests/it/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,41 @@

pub mod fake_key_spaces;
pub mod fake_state_machine_meta;

use common_base::GlobalSequence;
use common_meta_sled_store::get_sled_db;

/// 1. Open a temp sled::Db for all tests.
/// 2. Initialize a global tracing.
/// 3. Create a span for a test case. One needs to enter it by `span.enter()` and keeps the guard held.
#[macro_export]
macro_rules! init_sled_ut {
() => {{
let t = tempfile::tempdir().expect("create temp dir to sled db");

common_meta_sled_store::init_temp_sled_db(t);
common_tracing::init_default_ut_tracing();

let name = common_tracing::func_name!();
let span =
common_tracing::tracing::debug_span!("ut", "{}", name.split("::").last().unwrap());
((), span)
}};
}

pub struct SledTestContext {
pub tree_name: String,
pub db: sled::Db,
}

/// Create a new context for testing sled
pub fn new_sled_test_context() -> SledTestContext {
SledTestContext {
tree_name: format!("test-{}-", next_seq()),
db: get_sled_db(),
}
}

pub fn next_seq() -> u32 {
29000u32 + (GlobalSequence::next() as u32)
}

0 comments on commit b1a9b70

Please sign in to comment.