Skip to content

Commit

Permalink
Iterator should be in corrupted status if merge operator return false
Browse files Browse the repository at this point in the history
Summary:
Iterator should be in corrupted status if merge operator return false.
Also add test to make sure if max_successive_merges is hit during write,
data will not be lost.
Closes facebook#1665

Differential Revision: D4322695

Pulled By: yiwu-arbug

fbshipit-source-id: b327b05
  • Loading branch information
Yi Wu authored and facebook-github-bot committed Dec 16, 2016
1 parent a8bf4d6 commit c270735
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 33 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ set(TESTS
db/db_iter_test.cc
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_table_properties_test.cc
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ TESTS = \
db_inplace_update_test \
db_iterator_test \
db_memtable_test \
db_merge_operator_test \
db_options_test \
db_range_del_test \
db_sst_test \
Expand Down Expand Up @@ -999,6 +1000,9 @@ db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
59 changes: 39 additions & 20 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ void DBIter::MergeValuesNewToOld() {
iter_->IsValuePinned() /* operand_pinned */);

ParsedInternalKey ikey;
Status s;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
if (!ParseKey(&ikey)) {
// skip corrupted key
Expand All @@ -538,9 +539,12 @@ void DBIter::MergeValuesNewToOld() {
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_);
if (!s.ok()) {
status_ = s;
}
// iter_ is positioned after put
iter_->Next();
return;
Expand All @@ -559,9 +563,12 @@ void DBIter::MergeValuesNewToOld() {
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
if (!s.ok()) {
status_ = s;
}
}

void DBIter::Prev() {
Expand Down Expand Up @@ -742,6 +749,7 @@ bool DBIter::FindValueForCurrentKey() {
FindParseableKey(&ikey, kReverse);
}

Status s;
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
Expand All @@ -753,16 +761,16 @@ bool DBIter::FindValueForCurrentKey() {
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_);
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_,
env_, &pinned_value_);
} else {
assert(last_not_merge_type == kTypeValue);
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
&pinned_value_,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetKey(), &pinned_value_,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_);
}
break;
case kTypeValue:
Expand All @@ -773,6 +781,9 @@ bool DBIter::FindValueForCurrentKey() {
break;
}
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}

Expand Down Expand Up @@ -818,28 +829,36 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
FindParseableKey(&ikey, kForward);
}

Status s;
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(ikey)) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}

const Slice& val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_, &pinned_value_);
valid_ = true;
if (!s.ok()) {
status_ = s;
}
return true;
}

Expand Down
98 changes: 98 additions & 0 deletions db/db_merge_operator_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <string>
#include <vector>

#include "db/db_test_util.h"
#include "db/forward_iterator.h"
#include "port/stack_trace.h"
#include "utilities/merge_operators.h"

namespace rocksdb {

// Test merge operator functionality.
class DBMergeOperatorTest : public DBTestBase {
public:
DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
};

// A test merge operator mimics put but also fails if one of merge operands is
// "corrupted".
class TestPutOperator : public MergeOperator {
public:
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
if (merge_in.existing_value != nullptr &&
*(merge_in.existing_value) == "corrupted") {
return false;
}
for (auto value : merge_in.operand_list) {
if (value == "corrupted") {
return false;
}
}
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}

virtual const char* Name() const override { return "TestPutOperator"; }
};

TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());
Reopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "corrupted"));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
}

TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());
options.max_successive_merges = 3;
Reopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "v2"));
// Will trigger a merge when hitting max_successive_merges and the merge
// will fail. The delta will be inserted nevertheless.
ASSERT_OK(Merge("k1", "corrupted"));
// Data should stay unmerged after the error.
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
}

TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new TestPutOperator());

DestroyAndReopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "corrupted"));
ASSERT_OK(Put("k2", "v2"));
VerifyDBFromMap({{"k1", ""}, {"k2", "v2"}}, nullptr, false,
{{"k1", Status::Corruption()}});
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});

DestroyAndReopen(options);
ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Merge("k2", "corrupted"));
VerifyDBFromMap({{"k1", "v1"}, {"k2", ""}}, nullptr, false,
{{"k2", Status::Corruption()}});
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
}

} // namespace rocksdb

int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
64 changes: 59 additions & 5 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_test_util.h"
#include "db/forward_iterator.h"

namespace rocksdb {

Expand Down Expand Up @@ -502,6 +503,15 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
}
}

Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
return db_->Merge(wo, k, v);
}

Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
WriteOptions wo) {
return db_->Merge(wo, handles_[cf], k, v);
}

Status DBTestBase::Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
Expand Down Expand Up @@ -1090,11 +1100,18 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
}

void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res, bool tailing_iter) {
size_t* total_reads_res, bool tailing_iter,
std::map<std::string, Status> status) {
size_t total_reads = 0;

for (auto& kv : true_data) {
ASSERT_EQ(Get(kv.first), kv.second);
Status s = status[kv.first];
if (s.ok()) {
ASSERT_EQ(Get(kv.first), kv.second);
} else {
std::string value;
ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
}
total_reads++;
}

Expand All @@ -1107,21 +1124,40 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
// Verify Iterator::Next()
iter_cnt = 0;
auto data_iter = true_data.begin();
Status s;
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
Status current_status = status[data_iter->first];
if (!current_status.ok()) {
s = current_status;
}
ASSERT_EQ(iter->status(), s);
if (current_status.ok()) {
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
iter_cnt++;
total_reads++;
}
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
<< true_data.size();
delete iter;

// Verify Iterator::Prev()
// Use a new iterator to make sure its status is clean.
iter = db_->NewIterator(ro);
iter_cnt = 0;
s = Status::OK();
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
ASSERT_EQ(iter->value().ToString(), data_rev->second);
Status current_status = status[data_rev->first];
if (!current_status.ok()) {
s = current_status;
}
ASSERT_EQ(iter->status(), s);
if (current_status.ok()) {
ASSERT_EQ(iter->value().ToString(), data_rev->second);
}
iter_cnt++;
total_reads++;
}
Expand All @@ -1135,7 +1171,6 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
ASSERT_EQ(kv.second, iter->value().ToString());
total_reads++;
}

delete iter;
}

Expand Down Expand Up @@ -1177,6 +1212,25 @@ void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
}
}

void DBTestBase::VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data) {
Arena arena;
InternalKeyComparator icmp(last_options_.comparator);
RangeDelAggregator range_del_agg(icmp, {});
auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
iter->SeekToFirst();
for (auto p : true_data) {
ASSERT_TRUE(iter->Valid());
ParsedInternalKey ikey;
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
ASSERT_EQ(p.first, ikey.user_key);
ASSERT_EQ(p.second, iter->value());
iter->Next();
};
ASSERT_FALSE(iter->Valid());
iter->~InternalIterator();
}

#ifndef ROCKSDB_LITE

uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
Expand Down
12 changes: 11 additions & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ class DBTestBase : public testing::Test {
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());

Status Merge(const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());

Status Merge(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions());

Status Delete(const std::string& k);

Status Delete(int cf, const std::string& k);
Expand Down Expand Up @@ -827,7 +833,11 @@ class DBTestBase : public testing::Test {

void VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res = nullptr,
bool tailing_iter = false);
bool tailing_iter = false,
std::map<std::string, Status> status = {});

void VerifyDBInternal(
std::vector<std::pair<std::string, std::string>> true_data);

#ifndef ROCKSDB_LITE
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
Expand Down
Loading

0 comments on commit c270735

Please sign in to comment.