Skip to content

Commit

Permalink
Add three new MemTableRep's
Browse files Browse the repository at this point in the history
Summary:
This patch adds three new MemTableRep's: UnsortedRep, PrefixHashRep, and VectorRep.

UnsortedRep stores keys in an std::unordered_map of std::sets. When an iterator is requested, it dumps the keys into an std::set and iterates over that.

VectorRep stores keys in an std::vector. When an iterator is requested, it creates a copy of the vector and sorts it using std::sort. The iterator accesses that new vector.

PrefixHashRep stores keys in an unordered_map mapping prefixes to ordered sets.

I also added one API change. I added a function MemTableRep::MarkImmutable. This function is called when the rep is added to the immutable list. It doesn't do anything yet, but it seems like that could be useful. In particular, for the vectorrep, it means we could elide the extra copy and just sort in place. The only reason I haven't done that yet is because the use of the ArenaAllocator complicates things (I can elaborate on this if needed).

Test Plan:
make -j32 check
./db_stress --memtablerep=vector
./db_stress --memtablerep=unsorted
./db_stress --memtablerep=prefixhash --prefix_size=10

Reviewers: dhruba, haobo, emayanke

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12117
  • Loading branch information
jpaton committed Aug 23, 2013
1 parent 17dc128 commit 74781a0
Show file tree
Hide file tree
Showing 22 changed files with 1,038 additions and 69 deletions.
68 changes: 68 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,18 @@ static auto FLAGS_bytes_per_sync =
// On true, deletes use bloom-filter and drop the delete if key not present
static bool FLAGS_filter_deletes = false;

// Control the prefix size for PrefixHashRep
static bool FLAGS_prefix_size = 0;

enum RepFactory {
kSkipList,
kPrefixHash,
kUnsorted,
kVectorRep
};

static enum RepFactory FLAGS_rep_factory;

// The merge operator to use with the database.
// If a new merge operator is specified, be sure to use fresh database
// The possible merge operators are defined in utilities/merge_operators.h
Expand Down Expand Up @@ -673,6 +685,21 @@ class Benchmark {
break;
}

switch (FLAGS_rep_factory) {
case kPrefixHash:
fprintf(stdout, "Memtablerep: prefix_hash\n");
break;
case kSkipList:
fprintf(stdout, "Memtablerep: skip_list\n");
break;
case kUnsorted:
fprintf(stdout, "Memtablerep: unsorted\n");
break;
case kVectorRep:
fprintf(stdout, "Memtablerep: vector\n");
break;
}

PrintWarnings();
fprintf(stdout, "------------------------------------------------\n");
}
Expand Down Expand Up @@ -1159,6 +1186,31 @@ class Benchmark {
options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options.filter_deletes = FLAGS_filter_deletes;
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n");
exit(1);
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(
new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size))
);
break;
case kUnsorted:
options.memtable_factory.reset(
new UnsortedRepFactory
);
break;
case kSkipList:
// no need to do anything
break;
case kVectorRep:
options.memtable_factory.reset(
new VectorRepFactory
);
break;
}
if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional.size() !=
(unsigned int)FLAGS_num_levels) {
Expand Down Expand Up @@ -2324,6 +2376,19 @@ int main(int argc, char** argv) {
else {
fprintf(stdout, "Cannot parse %s\n", argv[i]);
}
} else if (strncmp(argv[i], "--memtablerep=", 14) == 0) {
const char* ctype = argv[i] + 14;
if (!strcasecmp(ctype, "skip_list"))
FLAGS_rep_factory = kSkipList;
else if (!strcasecmp(ctype, "prefix_hash"))
FLAGS_rep_factory = kPrefixHash;
else if (!strcasecmp(ctype, "unsorted"))
FLAGS_rep_factory = kUnsorted;
else if (!strcasecmp(ctype, "vector"))
FLAGS_rep_factory = kVectorRep;
else {
fprintf(stdout, "Cannot parse %s\n", argv[i]);
}
} else if (sscanf(argv[i], "--min_level_to_compress=%d%c", &n, &junk) == 1
&& n >= 0) {
FLAGS_min_level_to_compress = n;
Expand All @@ -2338,6 +2403,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) {
FLAGS_stats_per_interval = n;
} else if (sscanf(argv[i], "--prefix_size=%d%c", &n, &junk) == 1 &&
n >= 0 && n < 2000000000) {
FLAGS_prefix_size = n;
} else if (sscanf(argv[i], "--soft_rate_limit=%lf%c", &d, &junk) == 1 &&
d > 0.0) {
FLAGS_soft_rate_limit = d;
Expand Down
18 changes: 16 additions & 2 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ Options SanitizeOptions(const std::string& dbname,
result.compaction_filter_factory->CreateCompactionFilter().get()) {
Log(result.info_log, "Both filter and factory specified. Using filter");
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a PrefixHashRepFactory is
// being used, make sure that the latter uses the former as its transform
// function.
auto factory = dynamic_cast<PrefixHashRepFactory*>(
result.memtable_factory.get());
if (factory != nullptr && factory->transform_ != result.prefix_extractor) {
Log(result.info_log, "A prefix hash representation factory was supplied "
"whose prefix extractor does not match options.prefix_extractor. "
"Falling back to skip list representation factory");
result.memtable_factory = std::make_shared<SkipListFactory>();
}
}
return result;
}

Expand Down Expand Up @@ -2143,7 +2156,8 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect together all needed child iterators for mem
std::vector<Iterator*> list;
mem_->Ref();
list.push_back(mem_->NewIterator());
list.push_back(mem_->NewIterator(options.prefix));

cleanup->mem.push_back(mem_);

// Collect together all needed child iterators for imm_
Expand All @@ -2152,7 +2166,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
for (unsigned int i = 0; i < immutables.size(); i++) {
MemTable* m = immutables[i];
m->Ref();
list.push_back(m->NewIterator());
list.push_back(m->NewIterator(options.prefix));
cleanup->mem.push_back(m);
}

Expand Down
2 changes: 0 additions & 2 deletions db/db_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,3 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
} // namespace leveldb

#endif // LEVELDB_STORAGE_DB_DB_STATISTICS_H_


41 changes: 33 additions & 8 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,12 @@ class DBTest {
private:
const FilterPolicy* filter_policy_;

protected:
// Sequence of option configurations to try
enum OptionConfig {
kDefault,
kVectorRep,
kUnsortedRep,
kMergePut,
kFilter,
kUncompressed,
Expand All @@ -219,6 +222,7 @@ class DBTest {
kCompactOnFlush,
kPerfOptions,
kDeletesFilterFirst,
kPrefixHashRep,
kUniversalCompaction,
kEnd
};
Expand Down Expand Up @@ -293,6 +297,10 @@ class DBTest {
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kPrefixHashRep:
options.memtable_factory.reset(new
PrefixHashRepFactory(NewFixedPrefixTransform(1)));
break;
case kMergePut:
options.merge_operator = MergeOperators::CreatePutOperator();
break;
Expand Down Expand Up @@ -321,6 +329,12 @@ class DBTest {
case kDeletesFilterFirst:
options.filter_deletes = true;
break;
case kUnsortedRep:
options.memtable_factory.reset(new UnsortedRepFactory);
break;
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory);
break;
case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal;
break;
Expand Down Expand Up @@ -3509,10 +3523,13 @@ class ModelDB: public DB {
KVMap map_;
};

static std::string RandomKey(Random* rnd) {
int len = (rnd->OneIn(3)
? 1 // Short sometimes to encourage collisions
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
static std::string RandomKey(Random* rnd, int minimum = 0) {
int len;
do {
len = (rnd->OneIn(3)
? 1 // Short sometimes to encourage collisions
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
} while (len < minimum);
return test::RandomKey(rnd, len);
}

Expand Down Expand Up @@ -3574,8 +3591,12 @@ TEST(DBTest, Randomized) {
for (int step = 0; step < N; step++) {
// TODO(sanjay): Test Get() works
int p = rnd.Uniform(100);
int minimum = 0;
if (option_config_ == kPrefixHashRep) {
minimum = 1;
}
if (p < 45) { // Put
k = RandomKey(&rnd);
k = RandomKey(&rnd, minimum);
v = RandomString(&rnd,
rnd.OneIn(20)
? 100 + rnd.Uniform(100)
Expand All @@ -3584,7 +3605,7 @@ TEST(DBTest, Randomized) {
ASSERT_OK(db_->Put(WriteOptions(), k, v));

} else if (p < 90) { // Delete
k = RandomKey(&rnd);
k = RandomKey(&rnd, minimum);
ASSERT_OK(model.Delete(WriteOptions(), k));
ASSERT_OK(db_->Delete(WriteOptions(), k));

Expand All @@ -3594,7 +3615,7 @@ TEST(DBTest, Randomized) {
const int num = rnd.Uniform(8);
for (int i = 0; i < num; i++) {
if (i == 0 || !rnd.OneIn(10)) {
k = RandomKey(&rnd);
k = RandomKey(&rnd, minimum);
} else {
// Periodically re-use the same key from the previous iter, so
// we have multiple entries in the write batch for the same key
Expand Down Expand Up @@ -3750,19 +3771,23 @@ TEST(DBTest, PrefixScan) {
snprintf(buf, sizeof(buf), "03______:");
prefix = Slice(buf, 8);
key = Slice(buf, 9);
auto prefix_extractor = NewFixedPrefixTransform(8);
auto memtable_factory =
std::make_shared<PrefixHashRepFactory>(prefix_extractor);

// db configs
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.block_cache = NewLRUCache(0); // Prevent cache hits
options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = NewFixedPrefixTransform(8);
options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false;
options.disable_auto_compactions = true;
options.max_background_compactions = 2;
options.create_if_missing = true;
options.disable_seek_compaction = true;
options.memtable_factory = memtable_factory;

// prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst
Expand Down
30 changes: 19 additions & 11 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,10 @@
#include "leveldb/iterator.h"
#include "leveldb/merge_operator.h"
#include "util/coding.h"
#include "util/murmurhash.h"

namespace leveldb {

static Slice GetLengthPrefixedSlice(const char* data) {
uint32_t len;
const char* p = data;
p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted
return Slice(p, len);
}

MemTable::MemTable(const InternalKeyComparator& cmp,
std::shared_ptr<MemTableRepFactory> table_factory,
int numlevel,
Expand All @@ -42,7 +36,8 @@ MemTable::~MemTable() {
}

size_t MemTable::ApproximateMemoryUsage() {
return arena_impl_.ApproximateMemoryUsage();
return arena_impl_.ApproximateMemoryUsage() +
table_->ApproximateMemoryUsage();
}

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
Expand All @@ -53,6 +48,11 @@ int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
return comparator.Compare(a, b);
}

Slice MemTableRep::UserKey(const char* key) const {
Slice slice = GetLengthPrefixedSlice(key);
return Slice(slice.data(), slice.size() - 8);
}

// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
Expand All @@ -68,6 +68,9 @@ class MemTableIterator: public Iterator {
explicit MemTableIterator(MemTableRep* table)
: iter_(table->GetIterator()) { }

MemTableIterator(MemTableRep* table, const Slice* prefix)
: iter_(table->GetPrefixIterator(*prefix)) { }

virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_->SeekToFirst(); }
Expand All @@ -93,8 +96,12 @@ class MemTableIterator: public Iterator {
void operator=(const MemTableIterator&);
};

Iterator* MemTable::NewIterator() {
return new MemTableIterator(table_.get());
Iterator* MemTable::NewIterator(const Slice* prefix) {
if (prefix) {
return new MemTableIterator(table_.get(), prefix);
} else {
return new MemTableIterator(table_.get());
}
}

void MemTable::Add(SequenceNumber s, ValueType type,
Expand Down Expand Up @@ -132,7 +139,8 @@ void MemTable::Add(SequenceNumber s, ValueType type,
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options) {
Slice memkey = key.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(table_.get()->GetIterator());
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(memkey.data());

// It is the caller's responsibility to allocate/delete operands list
Expand Down
9 changes: 8 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ class MemTable {
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/dbformat.{h,cc} module.
Iterator* NewIterator();
//
// If a prefix is supplied, it is passed to the underlying MemTableRep as a
// hint that the iterator only need to support access to keys with that
// prefix.
Iterator* NewIterator(const Slice* prefix = nullptr);

// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
Expand Down Expand Up @@ -96,6 +100,9 @@ class MemTable {
// memstore is flushed to storage
void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; }

// Notify the underlying storage that no more items will be added
void MarkImmutable() { table_->MarkReadOnly(); }

private:
~MemTable(); // Private since only Unref() should be used to delete it
friend class MemTableIterator;
Expand Down
1 change: 1 addition & 0 deletions db/memtablelist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void MemTableList::Add(MemTable* m) {
assert(size_ >= num_flush_not_started_);
size_++;
memlist_.push_front(m);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.Release_Store((void *)1);
Expand Down
2 changes: 1 addition & 1 deletion db/write_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include "leveldb/db.h"

#include <memory>
#include "db/skiplistrep.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/env.h"
#include "leveldb/memtablerep.h"
#include "util/logging.h"
#include "util/testharness.h"

Expand Down
3 changes: 3 additions & 0 deletions include/leveldb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#ifndef STORAGE_LEVELDB_INCLUDE_ARENA_H_
#define STORAGE_LEVELDB_INCLUDE_ARENA_H_

#include <limits>
#include <memory>

namespace leveldb {

class Arena {
Expand Down
Loading

0 comments on commit 74781a0

Please sign in to comment.