Skip to content

Commit

Permalink
Option to fail a request as incomplete when skipping too many interna…
Browse files Browse the repository at this point in the history
…l keys

Summary:
Operations like Seek/Next/Prev sometimes take too long to complete when there are many internal keys to be skipped. Adding an option, max_skippable_internal_keys -- which could be used to set a threshold for the maximum number of keys that can be skipped, will help to address these cases where it is much better to fail a request (as incomplete) than to wait for a considerable time for the request to complete.

This feature -- to fail an iterator seek request as incomplete, is disabled by default when max_skippable_internal_keys = 0. It is enabled only when max_skippable_internal_keys > 0.

This feature is based on the discussion mentioned in the PR facebook#1084.
Closes facebook#2000

Differential Revision: D4753223

Pulled By: sagar0

fbshipit-source-id: 1c973f7
  • Loading branch information
sagar0 authored and facebook-github-bot committed Mar 30, 2017
1 parent 58179ec commit c6d04f2
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 24 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased
### Public API Change
* Support dynamically change `stats_dump_period_sec` option via SetDBOptions().
* Added ReadOptions::max_skippable_internal_keys to set a threshold to fail a request as incomplete when too many keys are being skipped when using iterators.

### New Features
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.
Expand Down
16 changes: 11 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4442,7 +4442,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_options.iterate_upper_bound,
read_options.prefix_same_as_start, read_options.pin_data);
read_options.prefix_same_as_start, read_options.pin_data,
read_options.total_order_seek,
read_options.max_skippable_internal_keys);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
Expand Down Expand Up @@ -4501,7 +4503,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_options.iterate_upper_bound,
read_options.prefix_same_as_start, read_options.pin_data,
read_options.total_order_seek);
read_options.total_order_seek,
read_options.max_skippable_internal_keys);

InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
Expand Down Expand Up @@ -4553,7 +4556,9 @@ Status DBImpl::NewIterators(
env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, nullptr, false, read_options.pin_data));
sv->version_number, nullptr, false, read_options.pin_data,
read_options.total_order_seek,
read_options.max_skippable_internal_keys));
}
#endif
} else {
Expand All @@ -4573,7 +4578,9 @@ Status DBImpl::NewIterators(
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, nullptr, false, read_options.pin_data);
sv->version_number, nullptr, false, read_options.pin_data,
read_options.total_order_seek,
read_options.max_skippable_internal_keys);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
Expand Down Expand Up @@ -5160,7 +5167,6 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
env_->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();

}

while (bg_error_.ok() && write_controller_.IsStopped()) {
Expand Down
9 changes: 7 additions & 2 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
->number_
: latest_snapshot),
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number);
super_version->version_number, read_options.iterate_upper_bound,
read_options.prefix_same_as_start, read_options.pin_data,
read_options.total_order_seek, read_options.max_skippable_internal_keys);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
Expand Down Expand Up @@ -94,7 +96,10 @@ Status DBImplReadOnly::NewIterators(
->number_
: latest_snapshot),
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number);
sv->version_number, read_options.iterate_upper_bound,
read_options.prefix_same_as_start, read_options.pin_data,
read_options.total_order_seek,
read_options.max_skippable_internal_keys);
auto* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
Expand Down
70 changes: 59 additions & 11 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class DBIter: public Iterator {
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false, bool pin_data = false,
bool total_order_seek = false)
bool total_order_seek = false,
uint64_t max_skippable_internal_keys = 0)
: arena_mode_(arena_mode),
env_(env),
logger_(ioptions.info_log),
Expand All @@ -128,6 +129,7 @@ class DBIter: public Iterator {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
max_skippable_internal_keys_ = max_skippable_internal_keys;
if (pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
Expand Down Expand Up @@ -224,6 +226,7 @@ class DBIter: public Iterator {
void FindNextUserEntryInternal(bool skipping, bool prefix_check);
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
bool TooManyInternalKeysSkipped(bool increment = true);

// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
Expand All @@ -249,6 +252,10 @@ class DBIter: public Iterator {
}
}

inline void ResetInternalKeysSkippedCounter() {
num_internal_keys_skipped_ = 0;
}

const SliceTransform* prefix_extractor_;
bool arena_mode_;
Env* const env_;
Expand All @@ -268,6 +275,8 @@ class DBIter: public Iterator {
// for prefix seek mode to support prev()
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
uint64_t num_internal_keys_skipped_;
uint64_t version_number_;
const Slice* iterate_upper_bound_;
IterKey prefix_start_buf_;
Expand Down Expand Up @@ -304,6 +313,7 @@ void DBIter::Next() {

// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
if (direction_ == kReverse) {
ReverseToForward();
} else if (iter_->Valid() && !current_entry_is_merged_) {
Expand Down Expand Up @@ -390,6 +400,10 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
break;
}

if (TooManyInternalKeysSkipped()) {
return;
}

if (ikey.sequence <= sequence_) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
Expand Down Expand Up @@ -580,6 +594,7 @@ void DBIter::MergeValuesNewToOld() {
void DBIter::Prev() {
assert(valid_);
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
if (direction_ == kForward) {
ReverseToBackward();
}
Expand Down Expand Up @@ -658,6 +673,7 @@ void DBIter::PrevInternal() {
while (iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);

if (FindValueForCurrentKey()) {
valid_ = true;
if (!iter_->Valid()) {
Expand All @@ -674,6 +690,11 @@ void DBIter::PrevInternal() {
}
return;
}

if (TooManyInternalKeysSkipped(false)) {
return;
}

if (!iter_->Valid()) {
break;
}
Expand Down Expand Up @@ -709,6 +730,10 @@ bool DBIter::FindValueForCurrentKey() {
size_t num_skipped = 0;
while (iter_->Valid() && ikey.sequence <= sequence_ &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
if (TooManyInternalKeysSkipped()) {
return false;
}

// We iterate too much: let's use Seek() to avoid too much key comparisons
if (num_skipped >= max_skip_) {
return FindValueForCurrentKeyUsingSeek();
Expand Down Expand Up @@ -908,6 +933,10 @@ void DBIter::FindPrevUserKey() {
while (iter_->Valid() && ((cmp = user_comparator_->Compare(
ikey.user_key, saved_key_.GetKey())) == 0 ||
(cmp > 0 && ikey.sequence > sequence_))) {
if (TooManyInternalKeysSkipped()) {
return;
}

if (cmp == 0) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
Expand All @@ -930,6 +959,18 @@ void DBIter::FindPrevUserKey() {
}
}

bool DBIter::TooManyInternalKeysSkipped(bool increment) {
if ((max_skippable_internal_keys_ > 0) &&
(num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
valid_ = false;
status_ = Status::Incomplete("Too many internal keys skipped.");
return true;
} else if (increment) {
num_internal_keys_skipped_++;
}
return false;
}

// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
while (iter_->Valid() && !ParseKey(ikey)) {
Expand All @@ -944,6 +985,7 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
saved_key_.Clear();
saved_key_.SetInternalKey(target, sequence_);

Expand Down Expand Up @@ -985,6 +1027,7 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekForPrev(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
saved_key_.Clear();
// now saved_key is used to store internal key.
saved_key_.SetInternalKey(target, 0 /* sequence_number */,
Expand Down Expand Up @@ -1030,6 +1073,7 @@ void DBIter::SeekToFirst() {
}
direction_ = kForward;
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
ClearSavedValue();

{
Expand Down Expand Up @@ -1066,6 +1110,7 @@ void DBIter::SeekToLast() {
}
direction_ = kReverse;
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
ClearSavedValue();

{
Expand Down Expand Up @@ -1105,11 +1150,13 @@ Iterator* NewDBIterator(
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, const Slice* iterate_upper_bound,
bool prefix_same_as_start, bool pin_data, bool total_order_seek) {
DBIter* db_iter = new DBIter(
env, ioptions, user_key_comparator, internal_iter, sequence, false,
max_sequential_skip_in_iterations, version_number, iterate_upper_bound,
prefix_same_as_start, pin_data, total_order_seek);
bool prefix_same_as_start, bool pin_data, bool total_order_seek,
uint64_t max_skippable_internal_keys) {
DBIter* db_iter =
new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start, pin_data,
total_order_seek, max_skippable_internal_keys);
return db_iter;
}

Expand Down Expand Up @@ -1153,14 +1200,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound, bool prefix_same_as_start, bool pin_data,
bool total_order_seek) {
bool total_order_seek, uint64_t max_skippable_internal_keys) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem) DBIter(
env, ioptions, user_key_comparator, nullptr, sequence, true,
max_sequential_skip_in_iterations, version_number, iterate_upper_bound,
prefix_same_as_start, pin_data, total_order_seek);
DBIter* db_iter =
new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
true, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start, pin_data,
total_order_seek, max_skippable_internal_keys);

iter->SetDBIter(db_iter);

Expand Down
4 changes: 2 additions & 2 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extern Iterator* NewDBIterator(
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false, bool pin_data = false,
bool total_order_seek = false);
bool total_order_seek = false, uint64_t max_skippable_internal_keys = 0);

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -82,6 +82,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false, bool pin_data = false,
bool total_order_seek = false);
bool total_order_seek = false, uint64_t max_skippable_internal_keys = 0);

} // namespace rocksdb
Loading

0 comments on commit c6d04f2

Please sign in to comment.