Skip to content

Commit

Permalink
fix that DB::open_readonly not using the same cf options (aptos-labs#…
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Sep 10, 2024
1 parent f7ec471 commit 0b47a5a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 56 deletions.
2 changes: 1 addition & 1 deletion storage/aptosdb/src/ledger_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl LedgerDb {
&gen_rocksdb_options(db_config, true),
path.clone(),
name,
Self::get_column_families_by_name(name),
Self::gen_cfds_by_name(db_config, name),
)?
} else {
DB::open_cf(
Expand Down
10 changes: 2 additions & 8 deletions storage/aptosdb/src/state_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

use crate::{
common::NUM_STATE_SHARDS,
db_options::{
gen_state_kv_cfds, state_kv_db_column_families, state_kv_db_new_key_column_families,
},
db_options::gen_state_kv_cfds,
metrics::OTHER_TIMERS_SECONDS,
schema::{
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
Expand Down Expand Up @@ -268,11 +266,7 @@ impl StateKvDb {
&gen_rocksdb_options(state_kv_db_config, true),
path,
name,
if enable_sharding {
state_kv_db_new_key_column_families()
} else {
state_kv_db_column_families()
},
gen_state_kv_cfds(state_kv_db_config, enable_sharding),
)?
} else {
DB::open_cf(
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
common::NUM_STATE_SHARDS,
db_options::{gen_state_merkle_cfds, state_merkle_db_column_families},
db_options::gen_state_merkle_cfds,
lru_node_cache::LruNodeCache,
metrics::{NODE_CACHE_SECONDS, OTHER_TIMERS_SECONDS},
schema::{
Expand Down Expand Up @@ -638,7 +638,7 @@ impl StateMerkleDb {
&gen_rocksdb_options(state_merkle_db_config, true),
path,
name,
state_merkle_db_column_families(),
gen_state_merkle_cfds(state_merkle_db_config),
)?
} else {
DB::open_cf(
Expand Down
109 changes: 73 additions & 36 deletions storage/schemadb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ impl SchemaBatch {
}
}

#[derive(Debug)]
enum OpenMode<'a> {
ReadWrite,
ReadOnly,
Secondary(&'a Path),
}

/// This DB is a schematized RocksDB wrapper where all data passed in and out are typed according to
/// [`Schema`]s.
#[derive(Debug)]
Expand Down Expand Up @@ -143,27 +150,7 @@ impl DB {
name: &str,
cfds: Vec<ColumnFamilyDescriptor>,
) -> DbResult<DB> {
// ignore error, since it'll fail to list cfs on the first open
let existing_cfs = rocksdb::DB::list_cf(db_opts, path.de_unc()).unwrap_or_default();

let unrecognized_cfds = existing_cfs
.iter()
.map(AsRef::as_ref)
.collect::<HashSet<&str>>()
.difference(&cfds.iter().map(|cfd| cfd.name()).collect())
.map(|cf| {
warn!("Unrecognized CF: {}", cf);

let mut cf_opts = Options::default();
cf_opts.set_compression_type(DBCompressionType::Lz4);
ColumnFamilyDescriptor::new(cf.to_string(), cf_opts)
})
.collect::<Vec<_>>();
let all_cfds = cfds.into_iter().chain(unrecognized_cfds);

let inner =
rocksdb::DB::open_cf_descriptors(db_opts, path.de_unc(), all_cfds).into_db_res()?;
Ok(Self::log_construct(name, inner))
Self::open_cf_impl(db_opts, path, name, cfds, OpenMode::ReadWrite)
}

/// Open db in readonly mode
Expand All @@ -173,35 +160,85 @@ impl DB {
opts: &Options,
path: impl AsRef<Path>,
name: &str,
cfs: Vec<ColumnFamilyName>,
cfds: Vec<ColumnFamilyDescriptor>,
) -> DbResult<DB> {
let error_if_log_file_exists = false;
let inner =
rocksdb::DB::open_cf_for_read_only(opts, path.de_unc(), cfs, error_if_log_file_exists)
.into_db_res()?;

Ok(Self::log_construct(name, inner))
Self::open_cf_impl(opts, path, name, cfds, OpenMode::ReadOnly)
}

pub fn open_cf_as_secondary<P: AsRef<Path>>(
opts: &Options,
primary_path: P,
secondary_path: P,
name: &str,
cfs: Vec<ColumnFamilyName>,
cfds: Vec<ColumnFamilyDescriptor>,
) -> DbResult<DB> {
let inner = rocksdb::DB::open_cf_as_secondary(
Self::open_cf_impl(
opts,
primary_path.de_unc(),
secondary_path.de_unc(),
cfs,
primary_path,
name,
cfds,
OpenMode::Secondary(secondary_path.as_ref()),
)
}

fn open_cf_impl(
db_opts: &Options,
path: impl AsRef<Path>,
name: &str,
cfds: Vec<ColumnFamilyDescriptor>,
open_mode: OpenMode,
) -> DbResult<DB> {
// ignore error, since it'll fail to list cfs on the first open
let existing_cfs = rocksdb::DB::list_cf(db_opts, path.de_unc()).unwrap_or_default();

let unrecognized_cfds = existing_cfs
.iter()
.map(AsRef::as_ref)
.collect::<HashSet<&str>>()
.difference(&cfds.iter().map(|cfd| cfd.name()).collect())
.map(|cf| {
warn!("Unrecognized CF: {}", cf);

let mut cf_opts = Options::default();
cf_opts.set_compression_type(DBCompressionType::Lz4);
ColumnFamilyDescriptor::new(cf.to_string(), cf_opts)
})
.collect::<Vec<_>>();
let all_cfds = cfds.into_iter().chain(unrecognized_cfds);

let inner = {
use rocksdb::DB;
use OpenMode::*;

match open_mode {
ReadWrite => DB::open_cf_descriptors(db_opts, path.de_unc(), all_cfds),
ReadOnly => {
DB::open_cf_descriptors_read_only(
db_opts,
path.de_unc(),
all_cfds,
false, /* error_if_log_file_exist */
)
},
Secondary(secondary_path) => DB::open_cf_descriptors_as_secondary(
db_opts,
path.de_unc(),
secondary_path,
all_cfds,
),
}
}
.into_db_res()?;
Ok(Self::log_construct(name, inner))

Ok(Self::log_construct(name, open_mode, inner))
}

fn log_construct(name: &str, inner: rocksdb::DB) -> DB {
info!(rocksdb_name = name, "Opened RocksDB.");
fn log_construct(name: &str, open_mode: OpenMode, inner: rocksdb::DB) -> DB {
info!(
rocksdb_name = name,
open_mode = ?open_mode,
"Opened RocksDB."
);
DB {
name: name.to_string(),
inner,
Expand Down
20 changes: 11 additions & 9 deletions storage/schemadb/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_schemadb::{
};
use aptos_storage_interface::AptosDbError;
use byteorder::{LittleEndian, ReadBytesExt};
use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
use rocksdb::{ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME};

// Creating two schemas that share exactly the same structure but are stored in different column
// families. Also note that the key and value are of the same type `TestField`. By implementing
Expand Down Expand Up @@ -81,6 +81,13 @@ fn get_column_families() -> Vec<ColumnFamilyName> {
]
}

fn get_cfds() -> Vec<ColumnFamilyDescriptor> {
get_column_families()
.iter()
.map(|cf_name| ColumnFamilyDescriptor::new(*cf_name, rocksdb::Options::default()))
.collect()
}

fn open_db(dir: &aptos_temppath::TempPath) -> DB {
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
Expand All @@ -89,13 +96,8 @@ fn open_db(dir: &aptos_temppath::TempPath) -> DB {
}

fn open_db_read_only(dir: &aptos_temppath::TempPath) -> DB {
DB::open_cf_readonly(
&rocksdb::Options::default(),
dir.path(),
"test",
get_column_families(),
)
.expect("Failed to open DB.")
DB::open_cf_readonly(&rocksdb::Options::default(), dir.path(), "test", get_cfds())
.expect("Failed to open DB.")
}

fn open_db_as_secondary(dir: &aptos_temppath::TempPath, dir_sec: &aptos_temppath::TempPath) -> DB {
Expand All @@ -104,7 +106,7 @@ fn open_db_as_secondary(dir: &aptos_temppath::TempPath, dir_sec: &aptos_temppath
dir.path(),
dir_sec.path(),
"test",
get_column_families(),
get_cfds(),
)
.expect("Failed to open DB.")
}
Expand Down

0 comments on commit 0b47a5a

Please sign in to comment.