Skip to content

Commit

Permalink
Make types of Immutable/Mutable Options fields match that of the unde…
Browse files Browse the repository at this point in the history
…rlying Option (facebook#8176)

Summary:
This PR is a first step at attempting to clean up some of the Mutable/Immutable Options code.  With this change, a DBOption and a ColumnFamilyOption can be reconstructed from their Mutable and Immutable equivalents, respectively.

readrandom tests do not show any performance degradation versus master (though both are slightly slower than the current 6.19 release).

There are still fields in the ImmutableCFOptions that are not CF options but DB options.  Eventually, I would like to move those into an ImmutableOptions (= ImmutableDBOptions+ImmutableCFOptions).  But that will be part of a future PR to minimize changes and disruptions.

Pull Request resolved: facebook#8176

Reviewed By: pdillinger

Differential Revision: D27954339

Pulled By: mrambacher

fbshipit-source-id: ec6b805ba9afe6e094bffdbd76246c2d99aa9fad
  • Loading branch information
mrambacher authored and facebook-github-bot committed Apr 23, 2021
1 parent f0fca2b commit 01e460d
Show file tree
Hide file tree
Showing 27 changed files with 276 additions and 129 deletions.
2 changes: 1 addition & 1 deletion db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
std::move(file), blob_file_paths_->back(), *file_options_,
immutable_cf_options_->clock, io_tracer_, statistics,
immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory,
immutable_cf_options_->file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kBlobFile)));

constexpr bool do_flush = false;
Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status BlobFileReader::OpenFile(
file_reader->reset(new RandomAccessFileReader(
std::move(file), blob_file_path, immutable_cf_options.clock, io_tracer,
immutable_cf_options.statistics, BLOB_DB_BLOB_FILE_READ_MICROS,
blob_file_read_hist, immutable_cf_options.rate_limiter,
blob_file_read_hist, immutable_cf_options.rate_limiter.get(),
immutable_cf_options.listeners));

return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Status BuildTable(
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, ioptions.clock, io_tracer,
ioptions.statistics, ioptions.listeners,
ioptions.file_checksum_gen_factory,
ioptions.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));

builder = NewTableBuilder(
Expand All @@ -168,7 +168,7 @@ Status BuildTable(
}

MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
ioptions.merge_operator.get(), nullptr, ioptions.info_log,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
compaction_filter, db_options_.info_log.get(),
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}

const Status DBImpl::CreateArchivalDirectory() {
if (immutable_db_options_.wal_ttl_seconds > 0 ||
immutable_db_options_.wal_size_limit_mb > 0) {
if (immutable_db_options_.WAL_ttl_seconds > 0 ||
immutable_db_options_.WAL_size_limit_MB > 0) {
std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
return env_->CreateDirIfMissing(archivalPath);
}
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}

#ifndef ROCKSDB_LITE
if (type == kWalFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
immutable_db_options_.wal_size_limit_mb > 0)) {
if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 ||
immutable_db_options_.WAL_size_limit_MB > 0)) {
wal_manager_.ArchiveWALFile(fname, number);
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
ColumnFamilyData* cfd, bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
clock_(_env->GetSystemClock().get()),
clock_(cf_options.clock),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
merge_operator_(cf_options.merge_operator.get()),
iter_(iter),
version_(version),
read_callback_(read_callback),
Expand Down
2 changes: 1 addition & 1 deletion db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,

bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
assert(block_cache != nullptr);
auto* table_factory = cfd_->ioptions()->table_factory;
auto* table_factory = cfd_->ioptions()->table_factory.get();
assert(table_factory != nullptr);
*block_cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
Expand Down
4 changes: 2 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ImmutableMemTableOptions::ImmutableMemTableOptions(
inplace_callback(ioptions.inplace_callback),
max_successive_merges(mutable_cf_options.max_successive_merges),
statistics(ioptions.statistics),
merge_operator(ioptions.merge_operator),
merge_operator(ioptions.merge_operator.get()),
info_log(ioptions.info_log),
allow_data_in_errors(ioptions.allow_data_in_errors) {}

Expand Down Expand Up @@ -106,7 +106,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_state_(FLUSH_NOT_REQUESTED),
clock_(ioptions.clock),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor),
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
Expand Down
2 changes: 1 addition & 1 deletion db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Status TableCache::GetTableReader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
file_read_hist, ioptions_.rate_limiter.get(), ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
ro,
TableReaderOptions(ioptions_, prefix_extractor, file_options,
Expand Down
4 changes: 2 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1768,8 +1768,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
: cfd_->ioptions()->statistics),
table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
merge_operator_((cfd_ == nullptr) ? nullptr
: cfd_->ioptions()->merge_operator),
merge_operator_(
(cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()),
storage_info_(
(cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
(cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
Expand Down
12 changes: 6 additions & 6 deletions db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ Status WalManager::GetUpdatesSince(
// b. get sorted non-empty archived logs
// c. delete what should be deleted
void WalManager::PurgeObsoleteWALFiles() {
bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
if (!ttl_enabled && !size_limit_enabled) {
return;
}
Expand All @@ -150,7 +150,7 @@ void WalManager::PurgeObsoleteWALFiles() {
}
uint64_t const now_seconds = static_cast<uint64_t>(current_time);
uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
? db_options_.wal_ttl_seconds / 2
? db_options_.WAL_ttl_seconds / 2
: kDefaultIntervalToDeleteObsoleteWAL;

if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
Expand Down Expand Up @@ -185,7 +185,7 @@ void WalManager::PurgeObsoleteWALFiles() {
s.ToString().c_str());
continue;
}
if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
Expand Down Expand Up @@ -234,8 +234,8 @@ void WalManager::PurgeObsoleteWALFiles() {
return;
}

size_t const files_keep_num =
static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size);
size_t const files_keep_num = static_cast<size_t>(
db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size);
if (log_files_num <= files_keep_num) {
return;
}
Expand Down
16 changes: 8 additions & 8 deletions db/wal_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,16 @@ int CountRecords(TransactionLogIterator* iter) {
} // namespace

TEST_F(WalManagerTest, WALArchivalSizeLimit) {
db_options_.wal_ttl_seconds = 0;
db_options_.wal_size_limit_mb = 1000;
db_options_.WAL_ttl_seconds = 0;
db_options_.WAL_size_limit_MB = 1000;
Init();

// TEST : Create WalManager with huge size limit and no ttl.
// Create some archived files and call PurgeObsoleteWALFiles().
// Count the archived log files that survived.
// Assert that all of them did.
// Change size limit. Re-open WalManager.
// Assert that archive is not greater than wal_size_limit_mb after
// Assert that archive is not greater than WAL_size_limit_MB after
// PurgeObsoleteWALFiles()
// Set ttl and time_to_check_ to small values. Re-open db.
// Assert that there are no archived logs left.
Expand All @@ -238,14 +238,14 @@ TEST_F(WalManagerTest, WALArchivalSizeLimit) {
ListSpecificFiles(env_.get(), archive_dir, kWalFile);
ASSERT_EQ(log_files.size(), 20U);

db_options_.wal_size_limit_mb = 8;
db_options_.WAL_size_limit_MB = 8;
Reopen();
wal_manager_->PurgeObsoleteWALFiles();

uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
ASSERT_TRUE(archive_size <= db_options_.wal_size_limit_mb * 1024 * 1024);
ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);

db_options_.wal_ttl_seconds = 1;
db_options_.WAL_ttl_seconds = 1;
env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
Reopen();
wal_manager_->PurgeObsoleteWALFiles();
Expand All @@ -255,7 +255,7 @@ TEST_F(WalManagerTest, WALArchivalSizeLimit) {
}

TEST_F(WalManagerTest, WALArchivalTtl) {
db_options_.wal_ttl_seconds = 1000;
db_options_.WAL_ttl_seconds = 1000;
Init();

// TEST : Create WalManager with a ttl and no size limit.
Expand All @@ -271,7 +271,7 @@ TEST_F(WalManagerTest, WALArchivalTtl) {
ListSpecificFiles(env_.get(), archive_dir, kWalFile);
ASSERT_GT(log_files.size(), 0U);

db_options_.wal_ttl_seconds = 1;
db_options_.WAL_ttl_seconds = 1;
env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
Reopen();
wal_manager_->PurgeObsoleteWALFiles();
Expand Down
Loading

0 comments on commit 01e460d

Please sign in to comment.