Skip to content

Commit

Permalink
reduce references to cfd->options() in DBImpl
Browse files Browse the repository at this point in the history
Summary:
I found it is almost impossible to get rid of this function in a single
batch. I will take a step by step approach

Test Plan: make release

Reviewers: sdong, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22995
  • Loading branch information
Lei Jin committed Sep 8, 2014
1 parent 011241b commit 048560a
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 138 deletions.
10 changes: 5 additions & 5 deletions db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}

bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
}
// Maybe use binary search to find right entry instead of linear search?
Expand Down Expand Up @@ -177,8 +177,8 @@ void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {

// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool is_manual) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
Expand Down Expand Up @@ -270,7 +270,7 @@ void Compaction::Summary(char* output, int len) {
uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t preallocation_size = 0;

if (cfd_->options()->compaction_style == kCompactionStyleLevel) {
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
preallocation_size =
cfd_->compaction_picker()->MaxFileSizeForLevel(output_level());
} else {
Expand Down
103 changes: 55 additions & 48 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,24 +294,24 @@ Status SanitizeDBOptionsByCFOptions(
return Status::OK();
}

CompressionType GetCompressionFlush(const Options& options) {
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) {
// Compressing memtable flushes might not help unless the sequential load
// optimization is used for leveled compaction. Otherwise the CPU and
// latency overhead is not offset by saving much space.

bool can_compress;

if (options.compaction_style == kCompactionStyleUniversal) {
if (ioptions.compaction_style == kCompactionStyleUniversal) {
can_compress =
(options.compaction_options_universal.compression_size_percent < 0);
(ioptions.compaction_options_universal.compression_size_percent < 0);
} else {
// For leveled compress when min_level_to_compress == 0.
can_compress = options.compression_per_level.empty() ||
options.compression_per_level[0] != kNoCompression;
can_compress = ioptions.compression_per_level.empty() ||
ioptions.compression_per_level[0] != kNoCompression;
}

if (can_compress) {
return options.compression;
return ioptions.compression;
} else {
return kNoCompression;
}
Expand Down Expand Up @@ -1424,8 +1424,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->options()),
cfd->options()->compression_opts, Env::IO_HIGH);
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
mutex_.Lock();
}
Expand Down Expand Up @@ -1498,8 +1498,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->options()),
cfd->options()->compression_opts, Env::IO_HIGH);
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
}
Log(db_options_.info_log,
Expand Down Expand Up @@ -1537,7 +1537,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
// threads could be concurrently producing compacted files for
// that key range.
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
cfd->options()->compaction_style == kCompactionStyleLevel) {
cfd->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
Expand Down Expand Up @@ -1666,8 +1666,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
// bottom-most level, the output level will be the same as input one.
// level 0 can never be the bottommost level (i.e. if all files are in level
// 0, we will compact to level 1)
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO ||
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO ||
(level == max_level_with_files && level > 0)) {
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
} else {
Expand Down Expand Up @@ -1828,16 +1828,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// For universal compaction, we enforce every manual compaction to compact
// all files.
if (begin == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
Expand Down Expand Up @@ -2288,7 +2288,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// file if there is alive snapshot pointing to it
assert(c->num_input_files(1) == 0);
assert(c->level() == 0);
assert(c->column_family_data()->options()->compaction_style ==
assert(c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleFIFO);
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
Expand Down Expand Up @@ -2371,8 +2371,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
// Universal and FIFO compactions should always compact the whole range
assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal);
assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO);
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleUniversal);
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage;
}
Expand Down Expand Up @@ -2465,7 +2465,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
compact->compaction->OutputCompressionType(),
cfd->options()->compression_opts));
cfd->ioptions()->compression_opts));
}
LogFlush(db_options_.info_log);
return s;
Expand Down Expand Up @@ -2640,7 +2640,7 @@ Status DBImpl::ProcessKeyValueCompaction(
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
ColumnFamilyData* cfd = compact->compaction->column_family_data();
MergeHelper merge(
cfd->user_comparator(), cfd->options()->merge_operator.get(),
cfd->user_comparator(), cfd->ioptions()->merge_operator,
db_options_.info_log.get(), cfd->options()->min_partial_merge_operands,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->options()->compaction_filter;
Expand Down Expand Up @@ -3673,30 +3673,31 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
return s.ok() || s.IsIncomplete();
}

Iterator* DBImpl::NewIterator(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

if (options.tailing) {
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
// not supported in lite version
return nullptr;
#else
// TODO(ljin): remove tailing iterator
auto iter = new ForwardIterator(this, options, cfd);
return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber, options.iterate_upper_bound);
// return new TailingIterator(env_, this, options, cfd);
auto iter = new ForwardIterator(this, read_options, cfd);
return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
cfd->options()->max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr;
sv = cfd->GetReferencedSuperVersion(&mutex_);

auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot;

// Try to generate a DB iterator tree in continuous memory area to be
Expand Down Expand Up @@ -3742,19 +3743,22 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(),
snapshot, options.iterate_upper_bound);
env_, *cfd->ioptions(), cfd->user_comparator(),
snapshot, cfd->options()->max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);

Iterator* internal_iter =
NewInternalIterator(options, cfd, sv, db_iter->GetArena());
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);

return db_iter;
}
// To stop compiler from complaining
return nullptr;
}

Status DBImpl::NewIterators(
const ReadOptions& options,
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
iterators->clear();
Expand All @@ -3763,7 +3767,7 @@ Status DBImpl::NewIterators(
std::vector<SuperVersion*> super_versions;
super_versions.reserve(column_families.size());

if (!options.tailing) {
if (!read_options.tailing) {
mutex_.Lock();
latest_snapshot = versions_->LastSequence();
for (auto cfh : column_families) {
Expand All @@ -3773,17 +3777,18 @@ Status DBImpl::NewIterators(
mutex_.Unlock();
}

if (options.tailing) {
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
return Status::InvalidArgument(
"Tailing interator not supported in RocksDB lite");
#else
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto iter = new ForwardIterator(this, options, cfd);
auto iter = new ForwardIterator(this, read_options, cfd);
iterators->push_back(
NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber));
NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
cfd->options()->max_sequential_skip_in_iterations));
}
#endif
} else {
Expand All @@ -3792,14 +3797,16 @@ Status DBImpl::NewIterators(
auto cfd = cfh->cfd();

auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot;

ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(), snapshot);
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
cfd->options()->max_sequential_skip_in_iterations);
Iterator* internal_iter = NewInternalIterator(
options, cfd, super_versions[i], db_iter->GetArena());
read_options, cfd, super_versions[i], db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
Expand Down Expand Up @@ -3838,7 +3845,7 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
if (!cfh->cfd()->options()->merge_operator) {
if (!cfh->cfd()->ioptions()->merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
return DB::Merge(o, column_family, key, val);
Expand Down Expand Up @@ -4814,8 +4821,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,

if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i);
Expand All @@ -4827,7 +4834,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
}
}
if (cfd->options()->merge_operator != nullptr &&
if (cfd->ioptions()->merge_operator != nullptr &&
!cfd->mem()->IsMergeOperatorSupported()) {
s = Status::InvalidArgument(
"The memtable of column family %s does not support merge operator "
Expand Down
30 changes: 17 additions & 13 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,27 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
return s;
}

Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options,
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
SequenceNumber latest_snapshot = versions_->LastSequence();
auto db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(),
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
auto internal_iter =
NewInternalIterator(options, cfd, super_version, db_iter->GetArena());
env_, *cfd->ioptions(), cfd->user_comparator(),
(read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot),
cfd->options()->max_sequential_skip_in_iterations);
auto internal_iter = NewInternalIterator(
read_options, cfd, super_version, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}

Status DBImplReadOnly::NewIterators(
const ReadOptions& options,
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (iterators == nullptr) {
Expand All @@ -100,12 +102,14 @@ Status DBImplReadOnly::NewIterators(
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(),
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot);
env_, *cfd->ioptions(), cfd->user_comparator(),
(read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot),
cfd->options()->max_sequential_skip_in_iterations);
auto internal_iter = NewInternalIterator(
options, cfd, cfd->GetSuperVersion()->Ref(), db_iter->GetArena());
read_options, cfd, cfd->GetSuperVersion()->Ref(), db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
Expand Down
Loading

0 comments on commit 048560a

Please sign in to comment.