Skip to content

Commit

Permalink
Optimize sequential insert into memtable - Part 1: Interface
Browse files Browse the repository at this point in the history
Summary:
Currently our skip-list have an optimization to speedup sequential
inserts from a single stream, by remembering the last insert position.
We extend the idea to support sequential inserts from multiple streams,
and even tolerate small reordering wihtin each stream.

This PR is the interface part adding the following:
- Add `memtable_insert_prefix_extractor` to allow specifying prefix for each key.
- Add `InsertWithHint()` interface to memtable, to allow underlying
  implementation to return a hint of insert position, which can be later
  pass back to optimize inserts.
- Memtable will maintain a map from prefix to hints and pass the hint
  via `InsertWithHint()` if `memtable_insert_prefix_extractor` is non-null.
Closes facebook#1419

Differential Revision: D4079367

Pulled By: yiwu-arbug

fbshipit-source-id: 3555326
  • Loading branch information
Yi Wu authored and Facebook Github Bot committed Nov 14, 2016
1 parent df5eeb8 commit 1ea79a7
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 4 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ set(TESTS
db/db_inplace_update_test.cc
db/db_iter_test.cc
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_table_properties_test.cc
Expand Down
3 changes: 2 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
* Suppor dynamically change `delayed_write_rate` option via SetDBOptions().

### New Features
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable.
* Add avoid_flush_during_shutdown option, which speeds up DB shutdown by not flushing unpersisted data (i.e. with disableWAL = true). Unpersisted data will be lost. The options is dynamically changeable via SetDBOptions().
* Add memtable_insert_with_hint_prefix_extractor option. The option is mean to reduce CPU usage for inserting keys into memtable, if keys can be group by prefix and insert for each prefix are sequential or almost sequential. See include/rocksdb/options.h for more details.

## 4.13.0 (10/18/2016)
### Public API Change
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ TESTS = \
db_flush_test \
db_inplace_update_test \
db_iterator_test \
db_memtable_test \
db_options_test \
db_sst_test \
external_sst_file_test \
Expand Down Expand Up @@ -955,6 +956,9 @@ db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJEC
db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
5 changes: 5 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.max_compaction_bytes = result.target_file_size_base * 25;
}

// Insert into memtable with hint is incompatible with concurrent inserts.
if (db_options.allow_concurrent_memtable_write) {
result.memtable_insert_with_hint_prefix_extractor = nullptr;
}

return result;
}

Expand Down
160 changes: 160 additions & 0 deletions db/db_memtable_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <memory>
#include <string>

#include "db/db_test_util.h"
#include "db/memtable.h"
#include "port/stack_trace.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice_transform.h"

namespace rocksdb {

class DBMemTableTest : public DBTestBase {
public:
DBMemTableTest() : DBTestBase("/db_memtable_test") {}
};

class MockMemTableRep : public MemTableRep {
public:
explicit MockMemTableRep(MemTableAllocator* allocator, MemTableRep* rep)
: MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}

virtual KeyHandle Allocate(const size_t len, char** buf) override {
return rep_->Allocate(len, buf);
}

virtual void Insert(KeyHandle handle) override {
return rep_->Insert(handle);
}

virtual void InsertWithHint(KeyHandle handle, void** hint) override {
num_insert_with_hint_++;
ASSERT_NE(nullptr, hint);
last_hint_in_ = *hint;
rep_->InsertWithHint(handle, hint);
last_hint_out_ = *hint;
}

virtual bool Contains(const char* key) const override {
return rep_->Contains(key);
}

virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override {
rep_->Get(k, callback_args, callback_func);
}

virtual size_t ApproximateMemoryUsage() override {
return rep_->ApproximateMemoryUsage();
}

virtual Iterator* GetIterator(Arena* arena) override {
return rep_->GetIterator(arena);
}

void* last_hint_in() { return last_hint_in_; }
void* last_hint_out() { return last_hint_out_; }
int num_insert_with_hint() { return num_insert_with_hint_; }

private:
std::unique_ptr<MemTableRep> rep_;
void* last_hint_in_;
void* last_hint_out_;
int num_insert_with_hint_;
};

class MockMemTableRepFactory : public MemTableRepFactory {
public:
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
MemTableAllocator* allocator,
const SliceTransform* transform,
Logger* logger) override {
SkipListFactory factory;
MemTableRep* skiplist_rep =
factory.CreateMemTableRep(cmp, allocator, transform, logger);
mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
return mock_rep_;
}

virtual const char* Name() const override { return "MockMemTableRepFactory"; }

MockMemTableRep* rep() { return mock_rep_; }

private:
MockMemTableRep* mock_rep_;
};

class TestPrefixExtractor : public SliceTransform {
public:
virtual const char* Name() const override { return "TestPrefixExtractor"; }

virtual Slice Transform(const Slice& key) const override {
const char* p = separator(key);
if (p == nullptr) {
return Slice();
}
return Slice(key.data(), p - key.data() + 1);
}

virtual bool InDomain(const Slice& key) const override {
return separator(key) != nullptr;
}

virtual bool InRange(const Slice& key) const override { return false; }

private:
const char* separator(const Slice& key) const {
return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
}
};

TEST_F(DBMemTableTest, InsertWithHint) {
Options options;
options.create_if_missing = true;
options.memtable_factory.reset(new MockMemTableRepFactory());
options.memtable_insert_with_hint_prefix_extractor.reset(
new TestPrefixExtractor());
Reopen(options);
MockMemTableRep* rep =
reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
->rep();
ASSERT_OK(Put("foo_k1", "foo_v1"));
ASSERT_EQ(nullptr, rep->last_hint_in());
void* hint_foo = rep->last_hint_out();
ASSERT_OK(Put("foo_k2", "foo_v2"));
ASSERT_EQ(hint_foo, rep->last_hint_in());
ASSERT_EQ(hint_foo, rep->last_hint_out());
ASSERT_OK(Put("foo_k3", "foo_v3"));
ASSERT_EQ(hint_foo, rep->last_hint_in());
ASSERT_EQ(hint_foo, rep->last_hint_out());
ASSERT_OK(Put("bar_k1", "bar_v1"));
ASSERT_EQ(nullptr, rep->last_hint_in());
void* hint_bar = rep->last_hint_out();
ASSERT_NE(hint_foo, hint_bar);
ASSERT_OK(Put("bar_k2", "bar_v2"));
ASSERT_EQ(hint_bar, rep->last_hint_in());
ASSERT_EQ(hint_bar, rep->last_hint_out());
ASSERT_EQ(5, rep->num_insert_with_hint());
ASSERT_OK(Put("whitelisted", "vvv"));
ASSERT_EQ(5, rep->num_insert_with_hint());
ASSERT_EQ("foo_v1", Get("foo_k1"));
ASSERT_EQ("foo_v2", Get("foo_k2"));
ASSERT_EQ("foo_v3", Get("foo_k3"));
ASSERT_EQ("bar_v1", Get("bar_k1"));
ASSERT_EQ("bar_v2", Get("bar_k2"));
ASSERT_EQ("vvv", Get("whitelisted"));
}

} // namespace rocksdb

int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
18 changes: 16 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
: 0),
prefix_extractor_(ioptions.prefix_extractor),
flush_state_(FLUSH_NOT_REQUESTED),
env_(ioptions.env) {
env_(ioptions.env),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand Down Expand Up @@ -423,6 +425,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,

char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
Slice key_slice(p, key_size);
p += key_size;
uint64_t packed = PackSequenceAndType(s, type);
EncodeFixed64(p, packed);
Expand All @@ -431,7 +434,18 @@ void MemTable::Add(SequenceNumber s, ValueType type,
memcpy(p, value.data(), val_size);
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
if (!allow_concurrent) {
table->Insert(handle);
// Extract prefix for insert with hint.
Slice prefix;
if (insert_with_hint_prefix_extractor_ != nullptr) {
if (insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
}
}
if (prefix.empty()) {
table->Insert(handle);
} else {
table->InsertWithHint(handle, &insert_hints_[prefix]);
}

// this is a bit ugly, but is the way to avoid locked instructions
// when incrementing an atomic
Expand Down
8 changes: 8 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/dbformat.h"
#include "db/memtable_allocator.h"
Expand All @@ -25,6 +26,7 @@
#include "util/cf_options.h"
#include "util/concurrent_arena.h"
#include "util/dynamic_bloom.h"
#include "util/hash.h"
#include "util/instrumented_mutex.h"

namespace rocksdb {
Expand Down Expand Up @@ -390,6 +392,12 @@ class MemTable {

Env* env_;

// Extract sequential insert prefixes.
const SliceTransform* insert_with_hint_prefix_extractor_;

// Insert hints for each prefix.
std::unordered_map<Slice, void*, SliceHasher> insert_hints_;

// Returns a heuristic flush decision
bool ShouldFlushNow() const;

Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class MemTableRep {
// collection, and no concurrent modifications to the table in progress
virtual void Insert(KeyHandle handle) = 0;

// Same as Insert(), but in additional pass a hint to optimize sequential
// inserts. A new hint will be return from the hint pointer. Caller can get
// an initial hint by passing hint pointing to nullptr.
virtual void InsertWithHint(KeyHandle handle, void** hint) {
// Ignore the hint by default.
Insert(handle);
}

// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles
virtual void InsertConcurrently(KeyHandle handle) {
Expand Down
24 changes: 24 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,30 @@ struct ColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
size_t memtable_huge_page_size;

// If non-nullptr, memtable will use the specified function to extract
// prefixes for keys, and for each non-empty prefix maintain a hint to
// reduce CPU usage for inserting keys with the prefix. Keys with empty
// prefix will be insert without using a hint.
//
// Currently only the default skiplist based memtable implements the feature.
// All other memtable implementation will ignore the option. It incurs ~150
// additional bytes of memory overhead to store a hint for each prefix.
// If allow_concurrent_memtable_write is true, the option will also be
// ignored.
//
// The option is best suited for sequential inserts, or inserts that's
// almost sequential. One scenario could be inserting keys of the form
// (prefix + timestamp), and keys of the same prefix always comes in
// with time order, or in some cases a key with a smaller timestamp comes
// in later due to network latency.
//
// REQUIRES: If custom comparator is provided, it has to make sure keys
// with the same prefix appear in consecutive range.
//
// Default: nullptr (disable)
std::shared_ptr<const SliceTransform>
memtable_insert_with_hint_prefix_extractor;

// Control locality of bloom filter probes to improve cache miss rate.
// This option only applies to memtable prefix bloom and plaintable
// prefix bloom. It essentially limits every bloom checking to one cache line.
Expand Down
6 changes: 6 additions & 0 deletions memtable/skiplistrep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class SkipListRep : public MemTableRep {
skip_list_.Insert(static_cast<char*>(handle));
}

virtual void InsertWithHint(KeyHandle handle, void** hint) override {
skip_list_.InsertWithHint(
static_cast<char*>(handle),
reinterpret_cast<decltype(skip_list_)::InsertHint**>(hint));
}

virtual void InsertConcurrently(KeyHandle handle) override {
skip_list_.InsertConcurrently(static_cast<char*>(handle));
}
Expand Down
4 changes: 3 additions & 1 deletion util/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
force_consistency_checks(cf_options.force_consistency_checks),
listeners(db_options.listeners),
row_cache(db_options.row_cache),
max_subcompactions(db_options.max_subcompactions) {}
max_subcompactions(db_options.max_subcompactions),
memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor.get()) {}

// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {
Expand Down
2 changes: 2 additions & 0 deletions util/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ struct ImmutableCFOptions {
std::shared_ptr<Cache> row_cache;

uint32_t max_subcompactions;

const SliceTransform* memtable_insert_with_hint_prefix_extractor;
};

struct MutableCFOptions {
Expand Down
5 changes: 5 additions & 0 deletions util/hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ inline uint32_t GetSliceHash(const Slice& s) {
return Hash(s.data(), s.size(), 397);
}

// std::hash compatible interface.
struct SliceHasher {
uint32_t operator()(const Slice& s) const { return GetSliceHash(s); }
};

} // namespace rocksdb
7 changes: 7 additions & 0 deletions util/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
inplace_callback(nullptr),
memtable_prefix_bloom_size_ratio(0.0),
memtable_huge_page_size(0),
memtable_insert_with_hint_prefix_extractor(nullptr),
bloom_locality(0),
max_successive_merges(0),
min_partial_merge_operands(2),
Expand Down Expand Up @@ -145,6 +146,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
memtable_prefix_bloom_size_ratio(
options.memtable_prefix_bloom_size_ratio),
memtable_huge_page_size(options.memtable_huge_page_size),
memtable_insert_with_hint_prefix_extractor(
options.memtable_insert_with_hint_prefix_extractor),
bloom_locality(options.bloom_locality),
max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands),
Expand Down Expand Up @@ -463,6 +466,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
: CompressionTypeToString(bottommost_compression).c_str());
Header(log, " Options.prefix_extractor: %s",
prefix_extractor == nullptr ? "nullptr" : prefix_extractor->Name());
Header(log, " Options.memtable_insert_with_hint_prefix_extractor: %s",
memtable_insert_with_hint_prefix_extractor == nullptr
? "nullptr"
: memtable_insert_with_hint_prefix_extractor->Name());
Header(log, " Options.num_levels: %d", num_levels);
Header(log, " Options.min_write_buffer_number_to_merge: %d",
min_write_buffer_number_to_merge);
Expand Down
Loading

0 comments on commit 1ea79a7

Please sign in to comment.