Skip to content

Commit

Permalink
[rocksdb] prorotype wrapper for transactional and regular db (MystenL…
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o authored Dec 19, 2022
1 parent cbc31a7 commit f12b4e8
Show file tree
Hide file tree
Showing 8 changed files with 484 additions and 125 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 24 additions & 16 deletions crates/typed-store-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,6 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
let secondary_db_map_struct_name: proc_macro2::TokenStream =
secondary_db_map_struct_name_str.parse().unwrap();

let first_field_name = field_names
.get(0)
.expect("Expected at least one field")
.clone();

TokenStream::from(quote! {

// <----------- This section generates the configurator struct -------------->
Expand Down Expand Up @@ -401,6 +396,7 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
pub fn open_tables_impl(
path: std::path::PathBuf,
as_secondary_with_path: Option<std::path::PathBuf>,
is_transaction: bool,
global_db_options_override: Option<rocksdb::Options>,
tables_db_options_override: Option<typed_store::rocks::DBMapTableConfigMap>
) -> Self {
Expand All @@ -420,9 +416,10 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
};
// Safe to call unwrap because we will have atleast one field_name entry in the struct
let opt_cfs: Vec<_> = opt_cfs.iter().map(|q| (q.0.as_str(), &q.1.options)).collect();
let db = match as_secondary_with_path {
Some(p) => typed_store::rocks::open_cf_opts_secondary(path, Some(&p), global_db_options_override, &opt_cfs),
None => typed_store::rocks::open_cf_opts(path, global_db_options_override, &opt_cfs)
let db = match (as_secondary_with_path, is_transaction) {
(Some(p), _) => typed_store::rocks::open_cf_opts_secondary(path, Some(&p), global_db_options_override, &opt_cfs),
(_, true) => typed_store::rocks::open_cf_opts_transactional(path, global_db_options_override, &opt_cfs),
_ => typed_store::rocks::open_cf_opts(path, global_db_options_override, &opt_cfs)
};
db
}.expect("Cannot open DB.");
Expand Down Expand Up @@ -460,19 +457,30 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
global_db_options_override: Option<rocksdb::Options>,
tables_db_options_override: Option<typed_store::rocks::DBMapTableConfigMap>
) -> Self {
let inner = #intermediate_db_map_struct_name::open_tables_impl(path, None, global_db_options_override, tables_db_options_override);
let inner = #intermediate_db_map_struct_name::open_tables_impl(path, None, false, global_db_options_override, tables_db_options_override);
Self {
#(
#field_names: #post_process_fn(inner.#field_names),
)*
}
}

/// This gives info about memory usage and returns a tuple of total table memory usage and cache memory usage
pub fn get_memory_usage(&self) -> Result<(u64, u64), typed_store::rocks::TypedStoreError> {
let stats = rocksdb::perf::get_memory_usage_stats(Some(&[&self.#first_field_name.rocksdb]), None)
.map_err(|e| typed_store::rocks::TypedStoreError::RocksDBError(e.to_string()))?;
Ok((stats.mem_table_total, stats.cache_total))
/// Opens a set of tables in transactional read-write mode
/// Only one process is allowed to do this at a time
/// `global_db_options_override` apply to the whole DB
/// `tables_db_options_override` apply to each table. If `None`, the attributes from `default_options_override_fn` are used if any
#[allow(unused_parens)]
pub fn open_tables_transactional(
path: std::path::PathBuf,
global_db_options_override: Option<rocksdb::Options>,
tables_db_options_override: Option<typed_store::rocks::DBMapTableConfigMap>
) -> Self {
let inner = #intermediate_db_map_struct_name::open_tables_impl(path, None, true, global_db_options_override, tables_db_options_override);
Self {
#(
#field_names: #post_process_fn(inner.#field_names),
)*
}
}

/// Returns a list of the tables name and type pairs
Expand Down Expand Up @@ -514,12 +522,12 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
global_db_options_override: Option<rocksdb::Options>,
) -> Self {
let inner = match with_secondary_path {
Some(q) => #intermediate_db_map_struct_name::open_tables_impl(primary_path, Some(q), global_db_options_override, None),
Some(q) => #intermediate_db_map_struct_name::open_tables_impl(primary_path, Some(q), false, global_db_options_override, None),
None => {
let p: std::path::PathBuf = tempfile::tempdir()
.expect("Failed to open temporary directory")
.into_path();
#intermediate_db_map_struct_name::open_tables_impl(primary_path, Some(p), global_db_options_override, None)
#intermediate_db_map_struct_name::open_tables_impl(primary_path, Some(p), false, global_db_options_override, None)
}
};
Self {
Expand Down
1 change: 1 addition & 0 deletions crates/typed-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ tempfile = "3.3.0"
once_cell = "1.13.0"
proc-macro2 = "1.0.47"
quote = "1.0.9"
rstest = "0.16.0"
syn = { version = "1.0.104", features = ["derive"] }
typed-store-derive = {path = "../typed-store-derive"}
4 changes: 2 additions & 2 deletions crates/typed-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)]

use eyre::Result;
use rocksdb::MultiThreaded;
use serde::{de::DeserializeOwned, Serialize};
use std::{
cmp::Eq,
Expand All @@ -26,6 +25,7 @@ pub mod traits;
pub use traits::Map;
pub mod metrics;
pub mod rocks;
use crate::rocks::RocksDB;
pub use metrics::DBMetrics;

#[cfg(test)]
Expand Down Expand Up @@ -55,7 +55,7 @@ pub enum StoreCommand<Key, Value> {
#[derive(Clone)]
pub struct Store<K, V> {
channel: Sender<StoreCommand<K, V>>,
pub rocksdb: Arc<rocksdb::DBWithThreadMode<MultiThreaded>>,
pub rocksdb: Arc<RocksDB>,
}

impl<Key, Value> Store<Key, Value>
Expand Down
Loading

0 comments on commit f12b4e8

Please sign in to comment.