Skip to content

Commit

Permalink
WritePrepared Txn: Duplicate Keys, Memtable part
Browse files Browse the repository at this point in the history
Summary:
Currently DB does not accept duplicate keys (keys with the same user key and the same sequence number). If Memtable returns false when receiving such keys, we can benefit from this signal to properly increase the sequence number in the rare cases when we have a duplicate key in the write batch written to DB under WritePrepared transactions.
Closes facebook#3418

Differential Revision: D6822412

Pulled By: maysamyabandeh

fbshipit-source-id: adea3ce5073131cd38ed52b16bea0673b1a19e77
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Feb 1, 2018
1 parent e62a763 commit 813719e
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 60 deletions.
87 changes: 83 additions & 4 deletions db/db_memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ class MockMemTableRep : public MemTableRep {
return rep_->Allocate(len, buf);
}

virtual void Insert(KeyHandle handle) override {
virtual bool Insert(KeyHandle handle) override {
return rep_->Insert(handle);
}

virtual void InsertWithHint(KeyHandle handle, void** hint) override {
virtual bool InsertWithHint(KeyHandle handle, void** hint) override {
num_insert_with_hint_++;
ASSERT_NE(nullptr, hint);
EXPECT_NE(nullptr, hint);
last_hint_in_ = *hint;
rep_->InsertWithHint(handle, hint);
bool res = rep_->InsertWithHint(handle, hint);
last_hint_out_ = *hint;
return res;
}

virtual bool Contains(const char* key) const override {
Expand Down Expand Up @@ -129,6 +130,84 @@ class TestPrefixExtractor : public SliceTransform {
}
};

// Test that ::Add properly returns false when inserting duplicate keys
TEST_F(DBMemTableTest, DuplicateSeq) {
SequenceNumber seq = 123;
std::string value;
Status s;
MergeContext merge_context;
Options options;
InternalKeyComparator ikey_cmp(options.comparator);
RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */);

// Create a MemTable
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 /* column_family_id */);

// Write some keys and make sure it returns false on duplicates
bool res;
res = mem->Add(seq, kTypeValue, "key", "value2");
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key", "value2");
ASSERT_FALSE(res);
// Changing the type should still cause the duplicatae key
res = mem->Add(seq, kTypeMerge, "key", "value2");
ASSERT_FALSE(res);
// Changing the seq number will make the key fresh
res = mem->Add(seq + 1, kTypeMerge, "key", "value2");
ASSERT_TRUE(res);
// Test with different types for duplicate keys
res = mem->Add(seq, kTypeDeletion, "key", "");
ASSERT_FALSE(res);
res = mem->Add(seq, kTypeSingleDeletion, "key", "");
ASSERT_FALSE(res);

// Test the duplicate keys under stress
for (int i = 0; i < 10000; i++) {
bool insert_dup = i % 10 == 1;
if (!insert_dup) {
seq++;
}
res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
if (insert_dup) {
ASSERT_FALSE(res);
} else {
ASSERT_TRUE(res);
}
}
delete mem;

// Test with InsertWithHint
options.memtable_insert_with_hint_prefix_extractor.reset(
new TestPrefixExtractor()); // which uses _ to extract the prefix
ioptions = ImmutableCFOptions(options);
mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
// Insert a duplicate key with _ in it
res = mem->Add(seq, kTypeValue, "key_1", "value");
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key_1", "value");
ASSERT_FALSE(res);
delete mem;

// Test when InsertConcurrently will be invoked
options.allow_concurrent_memtable_write = true;
ioptions = ImmutableCFOptions(options);
mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
MemTablePostProcessInfo post_process_info;
res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
ASSERT_FALSE(res);
delete mem;
}

TEST_F(DBMemTableTest, InsertWithHint) {
Options options;
options.allow_concurrent_memtable_write = false;
Expand Down
8 changes: 4 additions & 4 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5114,10 +5114,10 @@ TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
ASSERT_OK(Flush());
}
std::thread manual_compaction_thread([this]() {
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = true;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
});
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = true;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
});

TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
for (int i = 0; i < kNumL0Files; ++i) {
Expand Down
4 changes: 2 additions & 2 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ class SpecialMemTableRep : public MemTableRep {

// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
virtual void Insert(KeyHandle handle) override {
memtable_->Insert(handle);
virtual bool Insert(KeyHandle handle) override {
num_entries_++;
return memtable_->Insert(handle);
}

// Returns true iff an entry that compares equal to key is in the list.
Expand Down
20 changes: 20 additions & 0 deletions db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
return r;
}

int InternalKeyComparator::CompareKeySeq(const Slice& akey,
const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
// Shift the number to exclude the last byte which contains the value type
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8;
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8;
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}

int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const {
// Order by:
Expand Down
2 changes: 2 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class InternalKeyComparator

virtual const char* Name() const override;
virtual int Compare(const Slice& a, const Slice& b) const override;
// Same as Compare except that it excludes the value type from comparison
virtual int CompareKeySeq(const Slice& a, const Slice& b) const;
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const override;
virtual void FindShortSuccessor(std::string* key) const override;
Expand Down
39 changes: 26 additions & 13 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,24 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
// Internal keys are encoded as length-prefixed strings.
Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
return comparator.Compare(k1, k2);
return comparator.CompareKeySeq(k1, k2);
}

int MemTable::KeyComparator::operator()(const char* prefix_len_key,
const Slice& key)
const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(prefix_len_key);
return comparator.Compare(a, key);
return comparator.CompareKeySeq(a, key);
}

void MemTableRep::InsertConcurrently(KeyHandle handle) {
bool MemTableRep::InsertConcurrently(KeyHandle handle) {
#ifndef ROCKSDB_LITE
throw std::runtime_error("concurrent insert not supported");
throw std::runtime_error("concurrent insert not supported");
#else
abort();
abort();
#endif
}
}

Slice MemTableRep::UserKey(const char* key) const {
Slice slice = GetLengthPrefixedSlice(key);
Expand Down Expand Up @@ -444,7 +444,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
return {entry_count * (data_size / n), entry_count};
}

void MemTable::Add(SequenceNumber s, ValueType type,
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
Expand Down Expand Up @@ -479,9 +479,15 @@ void MemTable::Add(SequenceNumber s, ValueType type,
if (insert_with_hint_prefix_extractor_ != nullptr &&
insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
table->InsertWithHint(handle, &insert_hints_[prefix]);
bool res = table->InsertWithHint(handle, &insert_hints_[prefix]);
if (!res) {
return res;
}
} else {
table->Insert(handle);
bool res = table->Insert(handle);
if (!res) {
return res;
}
}

// this is a bit ugly, but is the way to avoid locked instructions
Expand Down Expand Up @@ -514,7 +520,10 @@ void MemTable::Add(SequenceNumber s, ValueType type,
assert(post_process_info == nullptr);
UpdateFlushState();
} else {
table->InsertConcurrently(handle);
bool res = table->InsertConcurrently(handle);
if (!res) {
return res;
}

assert(post_process_info != nullptr);
post_process_info->num_entries++;
Expand Down Expand Up @@ -544,6 +553,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
is_range_del_table_empty_ = false;
}
UpdateOldestKeyTime();
return true;
}

// Callback from MemTable::Get()
Expand Down Expand Up @@ -799,8 +809,9 @@ void MemTable::Update(SequenceNumber seq,
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
ValueType type;
SequenceNumber unused;
UnPackSequenceAndType(tag, &unused, &type);
SequenceNumber existing_seq;
UnPackSequenceAndType(tag, &existing_seq, &type);
assert(existing_seq != seq);
if (type == kTypeValue) {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
Expand All @@ -823,7 +834,9 @@ void MemTable::Update(SequenceNumber seq,
}

// key doesn't exist
Add(seq, kTypeValue, key, value);
bool add_res __attribute__((__unused__)) = Add(seq, kTypeValue, key, value);
// We already checked unused != seq above. In that case, Add should not fail.
assert(add_res);
}

bool MemTable::UpdateCallback(SequenceNumber seq,
Expand Down
5 changes: 4 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ class MemTable {
//
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);

Expand Down
27 changes: 22 additions & 5 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,31 @@ class MemTableRep {
// single buffer and pass that in as the parameter to Insert).
// REQUIRES: nothing that compares equal to key is currently in the
// collection, and no concurrent modifications to the table in progress
virtual void Insert(KeyHandle handle) = 0;
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool Insert(KeyHandle handle) = 0;

// Same as Insert(), but in additional pass a hint to insert location for
// the key. If hint points to nullptr, a new hint will be populated.
// otherwise the hint will be updated to reflect the last insert location.
//
// Currently only skip-list based memtable implement the interface. Other
// implementations will fallback to Insert() by default.
virtual void InsertWithHint(KeyHandle handle, void** hint) {
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertWithHint(KeyHandle handle, void** hint) {
// Ignore the hint by default.
Insert(handle);
return Insert(handle);
}

// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles
virtual void InsertConcurrently(KeyHandle handle);
// to InsertConcurrently for other handles.
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertConcurrently(KeyHandle handle);

// Returns true iff an entry that compares equal to key is in the collection.
virtual bool Contains(const char* key) const = 0;
Expand Down Expand Up @@ -226,6 +235,12 @@ class MemTableRepFactory {
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }

// Return true if the current MemTableRep supports detecting duplicate
// <key,seq> at insertion time. If true, then MemTableRep::Insert* returns
// false when if the <key,seq> already exists.
// Default: false
virtual bool CanHandleDuplicatedKey() const { return false; }
};

// This uses a skip list to store keys. It is the default.
Expand All @@ -247,6 +262,8 @@ class SkipListFactory : public MemTableRepFactory {

bool IsInsertConcurrentlySupported() const override { return true; }

bool CanHandleDuplicatedKey() const override { return true; }

private:
const size_t lookahead_;
};
Expand Down
9 changes: 5 additions & 4 deletions memtable/hash_cuckoo_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class HashCuckooRep : public MemTableRep {
// Insert the specified key (internal_key) into the mem-table. Assertion
// fails if
// the current mem-table already contains the specified key.
virtual void Insert(KeyHandle handle) override;
virtual bool Insert(KeyHandle handle) override;

// This function returns bucket_count_ * approximate_entry_size_ when any
// of the followings happen to disallow further write operations:
Expand Down Expand Up @@ -319,7 +319,7 @@ void HashCuckooRep::Get(const LookupKey& key, void* callback_args,
}
}

void HashCuckooRep::Insert(KeyHandle handle) {
bool HashCuckooRep::Insert(KeyHandle handle) {
static const float kMaxFullness = 0.90f;

auto* key = static_cast<char*>(handle);
Expand All @@ -340,7 +340,7 @@ void HashCuckooRep::Insert(KeyHandle handle) {
is_nearly_full_ = true;
}
backup_table_->Insert(key);
return;
return true;
}
// when reaching this point, means the insert can be done successfully.
occupied_count_++;
Expand All @@ -349,7 +349,7 @@ void HashCuckooRep::Insert(KeyHandle handle) {
}

// perform kickout process if the length of cuckoo path > 1.
if (cuckoo_path_length == 0) return;
if (cuckoo_path_length == 0) return true;

// the cuckoo path stores the kickout path in reverse order.
// so the kickout or displacement is actually performed
Expand All @@ -366,6 +366,7 @@ void HashCuckooRep::Insert(KeyHandle handle) {
}
int insert_key_bid = cuckoo_path_[cuckoo_path_length - 1];
cuckoo_array_[insert_key_bid].store(key, std::memory_order_release);
return true;
}

bool HashCuckooRep::Contains(const char* internal_key) const {
Expand Down
Loading

0 comments on commit 813719e

Please sign in to comment.