Skip to content

Commit

Permalink
Benchmarking for Merge Operator
Browse files Browse the repository at this point in the history
Summary:
Updated db_bench and utilities/merge_operators.h to allow for dynamic benchmarking
of merge operators in db_bench. Added a new test (--benchmarks=mergerandom), which performs
a bunch of random Merge() operations over random keys. Also added a "--merge_operator=" flag
so that the tester can easily benchmark different merge operators. Currently supports
the PutOperator and UInt64Add operator. Support for stringappend or list append may come later.

Test Plan:
	1. make db_bench
	2. Test the PutOperator (simulating Put) as follows:
./db_bench --benchmarks=fillrandom,readrandom,updaterandom,readrandom,mergerandom,readrandom --merge_operator=put
--threads=2

3. Test the UInt64AddOperator (simulating numeric addition) similarly:
./db_bench --value_size=8 --benchmarks=fillrandom,readrandom,updaterandom,readrandom,mergerandom,readrandom
--merge_operator=uint64add --threads=2

Reviewers: haobo, dhruba, zshao, MarkCallaghan

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11535
  • Loading branch information
dojiboy9 committed Aug 16, 2013
1 parent f3dea8c commit ad48c3c
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 20 deletions.
130 changes: 127 additions & 3 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "util/string_util.h"
#include "util/testutil.h"
#include "hdfs/env_hdfs.h"
#include "utilities/merge_operators.h"

// Comma-separated list of operations to run in the specified order
// Actual benchmarks:
Expand All @@ -43,6 +44,9 @@
// readwhilewriting -- 1 writer, N threads doing random reads
// readrandomwriterandom - N threads doing random-read, random-write
// updaterandom -- N threads doing read-modify-write for random keys
// appendrandom -- N threads doing read-modify-write with growing values
// mergerandom -- same as updaterandom/appendrandom using merge operator
// -- must be used with FLAGS_merge_operator (see below)
// seekrandom -- N random seeks
// crc32c -- repeated crc32c of 4K of data
// acquireload -- load N*1000 times
Expand Down Expand Up @@ -349,6 +353,11 @@ static auto FLAGS_bytes_per_sync =
// On true, deletes use bloom-filter and drop the delete if key not present
static bool FLAGS_filter_deletes = false;

// The merge operator to use with the database.
// If a new merge operator is specified, be sure to use fresh database
// The possible merge operators are defined in utilities/merge_operators.h
static std::string FLAGS_merge_operator = "";

namespace leveldb {

// Helper for quickly generating random data.
Expand Down Expand Up @@ -628,6 +637,7 @@ class Benchmark {
int key_size_;
int entries_per_batch_;
WriteOptions write_options_;
std::shared_ptr<MergeOperator> merge_operator_;
long reads_;
long writes_;
long readwrites_;
Expand Down Expand Up @@ -769,6 +779,7 @@ class Benchmark {
value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size),
entries_per_batch_(1),
merge_operator_(nullptr),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num :
Expand Down Expand Up @@ -896,6 +907,16 @@ class Benchmark {
method = &Benchmark::ReadRandomWriteRandom;
} else if (name == Slice("updaterandom")) {
method = &Benchmark::UpdateRandom;
} else if (name == Slice("appendrandom")) {
method = &Benchmark::AppendRandom;
} else if (name == Slice("mergerandom")) {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
method = nullptr;
} else {
method = &Benchmark::MergeRandom;
}
} else if (name == Slice("randomwithverify")) {
method = &Benchmark::RandomWithVerify;
} else if (name == Slice("compact")) {
Expand Down Expand Up @@ -1190,6 +1211,15 @@ class Benchmark {
options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
options.bytes_per_sync = FLAGS_bytes_per_sync;

// merge operator options
merge_operator_ = MergeOperators::CreateFromStringId(FLAGS_merge_operator);
if (merge_operator_ == nullptr && !FLAGS_merge_operator.empty()) {
fprintf(stderr, "invalid merge operator: %s\n",
FLAGS_merge_operator.c_str());
exit(1);
}
options.merge_operator = merge_operator_.get();

Status s;
if(FLAGS_read_only) {
s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
Expand Down Expand Up @@ -1915,8 +1945,6 @@ class Benchmark {

//
// Read-modify-write for random keys
//
// TODO: Implement MergeOperator tests here (Read-modify-write)
void UpdateRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
Expand Down Expand Up @@ -1963,6 +1991,100 @@ class Benchmark {
thread->stats.AddMessage(msg);
}

// Read-modify-write for random keys.
// Each operation causes the key grow by value_size (simulating an append).
// Generally used for benchmarking against merges of similar type
void AppendRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long found = 0;

// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);

if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}

if (FLAGS_get_approx) {
char key2[100];
snprintf(key2, sizeof(key2), "%016d", k + 1);
Slice skey2(key2);
Slice skey(key2);
Range range(skey, skey2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}

// Get the existing value
if (db_->Get(options, key.get(), &value).ok()) {
found++;
} else {
// If not existing, then just assume an empty string of data
value.clear();
}

if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
}

// Update the value (by appending data)
Slice operand = gen.Generate(value_size_);
if (value.size() > 0) {
// Use a delimeter to match the semantics for StringAppendOperator
value.append(1,',');
}
value.append(operand.data(), operand.size());

// Write back to the database
Status s = db_->Put(write_options_, key.get(), value);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg), "( updates:%ld found:%ld)", readwrites_, found);
thread->stats.AddMessage(msg);
}

// Read-modify-write for random keys (using MergeOperator)
// The merge operator to use should be defined by FLAGS_merge_operator
// Adjust FLAGS_value_size so that the keys are reasonable for this operator
// Assumes that the merge operator is non-null (i.e.: is well-defined)
//
// For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
// to simulate random additions over 64-bit integers using merge.
void MergeRandom(ThreadState* thread) {
RandomGenerator gen;

// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const int k = thread->rand.Next() % FLAGS_num;
unique_ptr<char []> key = GenerateKeyFromInt(k);

Status s = db_->Merge(write_options_, key.get(),
gen.Generate(value_size_));

if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
}

// Print some statistics
char msg[100];
snprintf(msg, sizeof(msg), "( updates:%ld)", readwrites_);
thread->stats.AddMessage(msg);
}

void Compact(ThreadState* thread) {
db_->CompactRange(nullptr, nullptr);
}
Expand Down Expand Up @@ -2260,11 +2382,13 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--keys_per_multiget=%d%c",
&n, &junk) == 1) {
FLAGS_keys_per_multiget = n;
} else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) {
} else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) {
FLAGS_bytes_per_sync = l;
} else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk)
== 1 && (n == 0 || n ==1 )) {
FLAGS_filter_deletes = n;
} else if (sscanf(argv[i], "--merge_operator=%s", buf) == 1) {
FLAGS_merge_operator = buf;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
Expand Down
26 changes: 24 additions & 2 deletions tools/db_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "util/logging.h"
#include "utilities/ttl/db_ttl.h"
#include "hdfs/env_hdfs.h"
#include "utilities/merge_operators.h"

static const long KB = 1024;

Expand Down Expand Up @@ -198,6 +199,9 @@ static bool FLAGS_filter_deletes = false;
// Level0 compaction start trigger
static int FLAGS_level0_file_num_compaction_trigger = 0;

// On true, replaces all writes with a Merge that behaves like a Put
static bool FLAGS_use_merge_put = false;

namespace leveldb {

// convert long to a big-endian slice key
Expand Down Expand Up @@ -533,6 +537,7 @@ class StressTest {
FLAGS_test_batches_snapshots ?
sizeof(long) : sizeof(long)-1)),
db_(nullptr),
merge_operator_(MergeOperators::CreatePutOperator()),
num_times_reopened_(0) {
if (FLAGS_destroy_db_initially) {
std::vector<std::string> files;
Expand All @@ -548,6 +553,7 @@ class StressTest {

~StressTest() {
delete db_;
merge_operator_ = nullptr;
delete filter_policy_;
delete prefix_extractor_;
}
Expand Down Expand Up @@ -677,7 +683,11 @@ class StressTest {
keys[i] += key.ToString();
values[i] += value.ToString();
value_slices[i] = values[i];
batch.Put(keys[i], value_slices[i]);
if (FLAGS_use_merge_put) {
batch.Merge(keys[i], value_slices[i]);
} else {
batch.Put(keys[i], value_slices[i]);
}
}

s = db_->Write(writeoptions, &batch);
Expand Down Expand Up @@ -942,7 +952,11 @@ class StressTest {
VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true);
}
thread->shared->Put(rand_key, value_base);
db_->Put(write_opts, key, v);
if (FLAGS_use_merge_put) {
db_->Merge(write_opts, key, v);
} else {
db_->Put(write_opts, key, v);
}
thread->stats.AddBytesForWrites(1, sz);
} else {
MultiPut(thread, write_opts, key, v, sz);
Expand Down Expand Up @@ -1125,6 +1139,10 @@ class StressTest {
options.purge_redundant_kvs_while_flush = false;
}

if (FLAGS_use_merge_put) {
options.merge_operator = merge_operator_.get();
}

fprintf(stdout, "DB path: [%s]\n", FLAGS_db);

Status s;
Expand Down Expand Up @@ -1170,6 +1188,7 @@ class StressTest {
const SliceTransform* prefix_extractor_;
DB* db_;
StackableDB* sdb_;
std::shared_ptr<MergeOperator> merge_operator_;
int num_times_reopened_;
};

Expand Down Expand Up @@ -1335,6 +1354,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk)
== 1 && (n == 0 || n == 1)) {
FLAGS_filter_deletes = n;
} else if (sscanf(argv[i], "--use_merge=%d%c", &n, &junk)
== 1 && (n == 0 || n == 1)) {
FLAGS_use_merge_put = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
Expand Down
28 changes: 25 additions & 3 deletions utilities/merge_operators.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,39 @@
#define MERGE_OPERATORS_H

#include <memory>
#include <stdio.h>

#include "leveldb/merge_operator.h"

namespace leveldb {

class MergeOperators {
public:
static std::shared_ptr<leveldb::MergeOperator> CreatePutOperator();
static std::shared_ptr<leveldb::MergeOperator> CreateUInt64AddOperator();
static std::shared_ptr<MergeOperator> CreatePutOperator();
static std::shared_ptr<MergeOperator> CreateUInt64AddOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();

// Will return a different merge operator depending on the string.
// TODO: Hook the "name" up to the actual Name() of the MergeOperators?
static std::shared_ptr<MergeOperator> CreateFromStringId(
const std::string& name) {
if (name == "put") {
return CreatePutOperator();
} else if ( name == "uint64add") {
return CreateUInt64AddOperator();
} else if (name == "stringappend") {
return CreateStringAppendOperator();
} else if (name == "stringappendtest") {
return CreateStringAppendTESTOperator();
} else {
// Empty or unknown, just return nullptr
return nullptr;
}
}

};

}
} // namespace leveldb

#endif
4 changes: 4 additions & 0 deletions utilities/merge_operators/string_append/stringappend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ const char* StringAppendOperator::Name() const {
return "StringAppendOperator";
}

std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() {
return std::make_shared<StringAppendOperator>(',');
}

} // namespace leveldb


Expand Down
37 changes: 25 additions & 12 deletions utilities/merge_operators/string_append/stringappend2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,38 @@ bool StringAppendTESTOperator::PartialMerge(const Slice& key,
std::string* new_value,
Logger* logger) const {
return false;
}

// // Clear the *new_value for writing.
// assert(new_value);
// new_value->clear();
//
// // Generic append
// // Reserve correct size for *new_value, and apply concatenation.
// new_value->reserve(left_operand.size() + 1 + right_operand.size());
// new_value->assign(left_operand.data(), left_operand.size());
// new_value->append(1,delim_);
// new_value->append(right_operand.data(), right_operand.size());
//
// return true;
// A version of PartialMerge that actually performs "partial merging".
// Use this to simulate the exact behaviour of the StringAppendOperator.
bool StringAppendTESTOperator::_AssocPartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
// Clear the *new_value for writing.
assert(new_value);
new_value->clear();

// Generic append
// Reserve correct size for *new_value, and apply concatenation.
new_value->reserve(left_operand.size() + 1 + right_operand.size());
new_value->assign(left_operand.data(), left_operand.size());
new_value->append(1,delim_);
new_value->append(right_operand.data(), right_operand.size());

return true;
}

const char* StringAppendTESTOperator::Name() const {
return "StringAppendTESTOperator";
}


std::shared_ptr<MergeOperator>
MergeOperators::CreateStringAppendTESTOperator() {
return std::make_shared<StringAppendTESTOperator>(',');
}

} // namespace leveldb

Loading

0 comments on commit ad48c3c

Please sign in to comment.