Skip to content

Commit

Permalink
Fix a bug in range scan with merge and deletion with timestamp (faceb…
Browse files Browse the repository at this point in the history
…ook#10915)

Summary:
When performing Merge during range scan, iterator should understand value types of kDeletionWithTimestamp.

Also add an additional check in debug mode to MergeHelper, and account for the presence of compaction filter.

Pull Request resolved: facebook#10915

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D40960039

Pulled By: riversand963

fbshipit-source-id: dd79d86d7c79d05755bb939a3d94e0c53ddd7f59
  • Loading branch information
riversand963 authored and facebook-github-bot committed Nov 3, 2022
1 parent 941d834 commit 18cb731
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 7 deletions.
9 changes: 6 additions & 3 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ bool DBIter::MergeValuesNewToOld() {
// hit the next user key, stop right here
break;
}
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
kTypeDeletionWithTimestamp == ikey.type) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_.Next();
Expand Down Expand Up @@ -957,7 +958,8 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion) {
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeDeletionWithTimestamp) {
s = Merge(nullptr, saved_key_.GetUserKey());
if (!s.ok()) {
return false;
Expand Down Expand Up @@ -1164,7 +1166,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
ikey.type == kTypeDeletionWithTimestamp) {
break;
}
if (!iter_.PrepareValue()) {
Expand Down
71 changes: 71 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3799,6 +3799,77 @@ TEST_F(DBBasicTestWithTimestamp, MergeBasic) {

Close();
}

TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
DestroyAndReopen(options);

ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily();

const size_t num_keys_per_file = 10;
const size_t num_merges_per_key = 2;
for (size_t i = 0; i < num_keys_per_file; ++i) {
std::string ts = Timestamp(i + 10000, 0);
Status s = db_->Delete(WriteOptions(), Key1(i), ts);
ASSERT_OK(s);
for (size_t j = 1; j <= num_merges_per_key; ++j) {
ts = Timestamp(i + 10000 + j, 0);
s = db_->Merge(WriteOptions(), column_family, Key1(i), ts,
std::to_string(j));
ASSERT_OK(s);
}
}

const auto verify_db = [&]() {
ReadOptions read_opts;
std::string read_ts_str = Timestamp(20000, 0);
Slice ts = read_ts_str;
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
size_t count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) {
std::string key = Key1(count);
ASSERT_EQ(key, it->key());
std::string value;
for (size_t j = 1; j <= num_merges_per_key; ++j) {
value.append(std::to_string(j));
if (j < num_merges_per_key) {
value.push_back('.');
}
}
ASSERT_EQ(value, it->value());
std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0);
ASSERT_EQ(ts1, it->timestamp());
}
ASSERT_OK(it->status());
ASSERT_EQ(num_keys_per_file, count);
for (it->SeekToLast(); it->Valid(); it->Prev(), --count) {
std::string key = Key1(count - 1);
ASSERT_EQ(key, it->key());
std::string value;
for (size_t j = 1; j <= num_merges_per_key; ++j) {
value.append(std::to_string(j));
if (j < num_merges_per_key) {
value.push_back('.');
}
}
ASSERT_EQ(value, it->value());
std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0);
ASSERT_EQ(ts1, it->timestamp());
}
ASSERT_OK(it->status());
ASSERT_EQ(0, count);
};

verify_db();

Close();
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
16 changes: 12 additions & 4 deletions db/merge_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "db/compaction/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "db/wide/wide_column_serialization.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "port/likely.h"
Expand Down Expand Up @@ -255,6 +256,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
assert(s.ok());
if (!s.ok()) return s;

assert(kTypeMerge == orig_ikey.type);

bool hit_the_next_user_key = false;
int cmp_with_full_history_ts_low = 0;
for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
Expand Down Expand Up @@ -460,10 +463,15 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
}

if (cmp_with_full_history_ts_low >= 0) {
// If we reach here, and ts_sz == 0, it means compaction cannot perform
// merge with an earlier internal key, thus merge_context_.GetNumOperands()
// is 1.
assert(ts_sz == 0 || merge_context_.GetNumOperands() == 1);
size_t num_merge_operands = merge_context_.GetNumOperands();
if (ts_sz && num_merge_operands > 1) {
// We do not merge merge operands with different timestamps if they are
// not eligible for GC.
ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands",
static_cast<int>(ts_sz),
static_cast<int>(num_merge_operands));
assert(false);
}
}

if (merge_context_.GetNumOperands() == 0) {
Expand Down

0 comments on commit 18cb731

Please sign in to comment.