Skip to content

Commit

Permalink
Concurrent memtable inserter to update counters and flush state after…
Browse files Browse the repository at this point in the history
… all inserts

Summary: In concurrent memtable insert case, updating counters in MemTable::Add() can count for 5% CPU usage. By batch all the counters and update in the end of the write batch, the CPU overheads are overhead in the use cases where more than one key is updated in one write batch.

Test Plan:
Write throughput increases 12% with this benchmark setting:

TEST_TMPDIR=/dev/shm/ ./db_bench --benchmarks=fillrandom -disable_auto_compactions -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -num=10000000 --writes=1000000 -max_background_flushes=16 -max_write_buffer_number=16 --threads=64 --batch_size=128   -allow_concurrent_memtable_write -enable_write_thread_adaptive_yield

Reviewers: andrewkr, IslamAbdelRahman, ngbronson, igor

Reviewed By: ngbronson

Subscribers: ngbronson, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60495
  • Loading branch information
siying committed Jul 8, 2016
1 parent 0f691c4 commit 907f24d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
14 changes: 8 additions & 6 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey,

void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent) {
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
Expand Down Expand Up @@ -406,13 +407,16 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}
assert(first_seqno_.load() >= earliest_seqno_.load());
}
assert(post_process_info == nullptr);
UpdateFlushState();
} else {
table_->InsertConcurrently(handle);

num_entries_.fetch_add(1, std::memory_order_relaxed);
data_size_.fetch_add(encoded_len, std::memory_order_relaxed);
assert(post_process_info != nullptr);
post_process_info->num_entries++;
post_process_info->data_size += encoded_len;
if (type == kTypeDeletion) {
num_deletes_.fetch_add(1, std::memory_order_relaxed);
post_process_info->num_deletes++;
}

if (prefix_bloom_) {
Expand All @@ -432,8 +436,6 @@ void MemTable::Add(SequenceNumber s, ValueType type,
!first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
}
}

UpdateFlushState();
}

// Callback from MemTable::Get()
Expand Down
23 changes: 22 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ struct MemTableOptions {
Logger* info_log;
};

// Batched counters to updated when inserting keys in one write batch.
// In post process of the write batch, these can be updated together.
// Only used in concurrent memtable insert case.
struct MemTablePostProcessInfo {
uint64_t data_size = 0;
uint64_t num_entries = 0;
uint64_t num_deletes = 0;
};

// Note: Many of the methods in this class have comments indicating that
// external synchromization is required as these methods are not thread-safe.
// It is up to higher layers of code to decide how to prevent concurrent
Expand Down Expand Up @@ -157,7 +166,8 @@ class MemTable {
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false);
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);

// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
Expand Down Expand Up @@ -216,6 +226,17 @@ class MemTable {
// key in the memtable.
size_t CountSuccessiveMergeEntries(const LookupKey& key);

// Update counters and flush status after inserting a whole write batch
// Used in concurrent memtable inserts.
void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {
num_entries_.fetch_add(update_counters.num_entries,
std::memory_order_relaxed);
data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed);
num_deletes_.fetch_add(update_counters.num_deletes,
std::memory_order_relaxed);
UpdateFlushState();
}

// Get total number of entries in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
Expand Down
33 changes: 30 additions & 3 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "rocksdb/write_batch.h"

#include <map>
#include <stack>
#include <stdexcept>
#include <vector>
Expand Down Expand Up @@ -693,6 +694,8 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t log_number_ref_;
DBImpl* db_;
const bool concurrent_memtable_writes_;
typedef std::map<MemTable*, MemTablePostProcessInfo> MemPostInfoMap;
MemPostInfoMap mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery)
WriteBatch* rebuilding_trx_;

Expand All @@ -718,6 +721,12 @@ class MemTableInserter : public WriteBatch::Handler {

SequenceNumber get_final_sequence() { return sequence_; }

void PostProcess() {
for (auto& pair : mem_post_info_map_) {
pair.first->BatchPostProcess(pair.second);
}
}

bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// If we are in a concurrent mode, it is the caller's responsibility
// to clone the original ColumnFamilyMemTables so that each thread
Expand Down Expand Up @@ -770,7 +779,8 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_);
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
Expand Down Expand Up @@ -821,7 +831,8 @@ class MemTableInserter : public WriteBatch::Handler {
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) {
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_);
mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_,
get_post_process_info(mem));
sequence_++;
CheckMemtableFull();
return Status::OK();
Expand Down Expand Up @@ -1046,6 +1057,15 @@ class MemTableInserter : public WriteBatch::Handler {

return Status::OK();
}

private:
MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
if (!concurrent_memtable_writes_) {
// No need to batch counters locally if we don't use concurrent mode.
return nullptr;
}
return &mem_post_info_map_[mem];
}
};

// This function can only be called in these conditions:
Expand Down Expand Up @@ -1087,7 +1107,11 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
concurrent_memtable_writes);
assert(writer->ShouldWriteToMemtable());
inserter.set_log_number_ref(writer->log_ref);
return writer->batch->Iterate(&inserter);
Status s = writer->batch->Iterate(&inserter);
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}

Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
Expand All @@ -1104,6 +1128,9 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence();
}
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}

Expand Down

0 comments on commit 907f24d

Please sign in to comment.