Skip to content

Commit

Permalink
Reduce malloc of iterators in Get() code paths
Browse files Browse the repository at this point in the history
Summary:
This patch optimized Get() code paths by avoiding malloc of iterators. Iterator creation is moved to mem table rep implementations, where a callback is called when any key is found. This is the same practice as what we do in (SST) table readers.

db_bench result for readrandom following a writeseq, with no compression, single thread and tmpfs, we see throughput improved to 144958 from 139027, about 3%.

Test Plan: make all check

Reviewers: dhruba, haobo, igor

Reviewed By: haobo

CC: leveldb, yhchiang

Differential Revision: https://reviews.facebook.net/D14685
  • Loading branch information
siying committed Feb 11, 2014
1 parent d4b789f commit 3304266
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 99 deletions.
232 changes: 136 additions & 96 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,116 +207,147 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}
}

// Callback from MemTable::Get()
namespace {

struct Saver {
Status* status;
const LookupKey* key;
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
MemTable* mem;
Logger* logger;
Statistics* statistics;
bool inplace_update_support;
};
} // namespace

static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg);
MergeContext* merge_context = s->merge_context;
const MergeOperator* merge_operator = s->merge_operator;

assert(s != nullptr && merge_context != nullptr);

// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
assert(merge_operator);
if (!merge_operator->FullMerge(s->key->user_key(), &v,
merge_context->GetOperands(), s->value,
s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
} else {
s->value->assign(v.data(), v.size());
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->Unlock();
}
*(s->found_final_value) = true;
return false;
}
case kTypeDeletion: {
if (*(s->merge_in_progress)) {
assert(merge_operator);
*(s->status) = Status::OK();
if (!merge_operator->FullMerge(s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value,
s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true;
return false;
}
case kTypeMerge: {
std::string merge_result; // temporary area for merge results later
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true;
merge_context->PushOperand(v);
while (merge_context->GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(
s->key->user_key(), merge_context->GetOperand(0),
merge_context->GetOperand(1), &merge_result, s->logger)) {
merge_context->PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
return true;
}
default:
assert(false);
return true;
}
}

// s->state could be Corrupt, merge or notfound
return false;
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
StopWatchNano memtable_get_timer(options.env, false);
StartPerfTimer(&memtable_get_timer);

Slice mem_key = key.memtable_key();
Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();

std::unique_ptr<MemTableRep::Iterator> iter;
if (prefix_bloom_ &&
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
// iter is null if prefix bloom says the key does not exist
} else {
iter.reset(table_->GetIterator(user_key));
iter->Seek(key.internal_key(), mem_key.data());
}

bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get();
auto logger = options.info_log;
std::string merge_result;

bool found_final_value = false;
for (; !found_final_value && iter && iter->Valid(); iter->Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length = 0;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
if (options.inplace_update_support) {
GetLock(key.user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*s = Status::OK();
if (merge_in_progress) {
assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
value->assign(v.data(), v.size());
}
if (options.inplace_update_support) {
GetLock(key.user_key())->Unlock();
}
found_final_value = true;
break;
}
case kTypeDeletion: {
if (merge_in_progress) {
assert(merge_operator);
*s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
*s = Status::NotFound();
}
found_final_value = true;
break;
}
case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
merge_in_progress = true;
merge_context.PushOperand(v);
while(merge_context.GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(key.user_key(),
merge_context.GetOperand(0),
merge_context.GetOperand(1),
&merge_result, logger.get())) {
merge_context.PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
break;
}
default:
assert(false);
break;
}
} else {
// exit loop if user key does not match
break;
}
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.status = s;
saver.mem = this;
saver.merge_context = &merge_context;
saver.merge_operator = options.merge_operator.get();
saver.logger = options.info_log.get();
saver.inplace_update_support = options.inplace_update_support;
saver.statistics = options.statistics.get();
table_->Get(key, &saver, SaveValue);
}

// No change to value, since we have not yet found a Put/Delete

if (!found_final_value && merge_in_progress) {
*s = Status::MergeInProgress("");
}
Expand Down Expand Up @@ -488,4 +519,13 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
return num_successive_merges;
}

void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetIterator(k.user_key());
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}

} // namespace rocksdb
10 changes: 7 additions & 3 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ class MemTable {
// Notify the underlying storage that no more items will be added
void MarkImmutable() { table_->MarkReadOnly(); }

// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);

const InternalKeyComparator& GetInternalKeyComparator() const {
return comparator_.comparator;
}

private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;
Expand Down Expand Up @@ -190,9 +197,6 @@ class MemTable {
MemTable(const MemTable&);
void operator=(const MemTable&);

// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);

const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
};
Expand Down
1 change: 1 addition & 0 deletions db/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#pragma once
#include <assert.h>
#include <stdlib.h>
#include "util/arena.h"
#include "port/port.h"
#include "util/arena.h"
#include "util/random.h"
Expand Down
1 change: 1 addition & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TableCache;
class Version;
class VersionSet;
class MergeContext;
class LookupKey;

// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
class WriteBatch;
class Env;

// Metadata associated with each SST file.
struct LiveFileMetaData {
Expand Down
15 changes: 15 additions & 0 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
namespace rocksdb {

class Arena;
class LookupKey;
class Slice;
class SliceTransform;

Expand Down Expand Up @@ -74,6 +75,20 @@ class MemTableRep {
// nothing.
virtual void MarkReadOnly() { }

// Look up key from the mem table, since the first key in the mem table whose
// user_key matches the one given k, call the function callback_func(), with
// callback_args directly forwarded as the first parameter, and the mem table
// key as the second parameter. If the return value is false, then terminates.
// Otherwise, go through the next key.
// It's safe for Get() to terminate after having finished all the potential
// key for the k.user_key(), or not.
//
// Default:
// Get() function with a default value of dynamically construct an iterator,
// seek and call the call back function.
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry));

// Report an approximation of how much memory has been used other than memory
// that was allocated through the arena.
virtual size_t ApproximateMemoryUsage() = 0;
Expand Down
17 changes: 17 additions & 0 deletions util/hash_linklist_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class HashLinkListRep : public MemTableRep {

virtual size_t ApproximateMemoryUsage() override;

virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override;

virtual ~HashLinkListRep();

virtual MemTableRep::Iterator* GetIterator() override;
Expand Down Expand Up @@ -398,6 +402,19 @@ size_t HashLinkListRep::ApproximateMemoryUsage() {
return 0;
}

void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Iterator iter(this, bucket);
for (iter.Seek(k.internal_key(), nullptr);
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
}

MemTableRep::Iterator* HashLinkListRep::GetIterator() {
auto list = new FullList(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
Expand Down
17 changes: 17 additions & 0 deletions util/hash_skiplist_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class HashSkipListRep : public MemTableRep {

virtual size_t ApproximateMemoryUsage() override;

virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override;

virtual ~HashSkipListRep();

virtual MemTableRep::Iterator* GetIterator() override;
Expand Down Expand Up @@ -271,6 +275,19 @@ size_t HashSkipListRep::ApproximateMemoryUsage() {
return sizeof(buckets_);
}

void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Bucket::Iterator iter(bucket);
for (iter.Seek(k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
}

MemTableRep::Iterator* HashSkipListRep::GetIterator() {
auto list = new Bucket(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
Expand Down
Loading

0 comments on commit 3304266

Please sign in to comment.