Skip to content

Commit

Permalink
Better locking in vectorrep that increases throughput to match speed …
Browse files Browse the repository at this point in the history
…of storage.

Summary:
There is a use-case where we want to insert data into rocksdb as
fast as possible. Vector rep is used for this purpose.

The background flush thread needs to flush the vectorrep to
storage. It acquires the dblock then sorts the vector, releases
the dblock and then writes the sorted vector to storage. This is
suboptimal because the lock is held during the sort, which
prevents new writes for occuring.

This patch moves the sorting of the vector rep to outside the
db mutex. Performance is now as fastas the underlying storage
system. If you are doing buffered writes to rocksdb files, then
you can observe throughput upwards of 200 MB/sec writes.

This is an early draft and not yet ready to be reviewed.

Test Plan:
make check

Task ID: #

Blame Rev:

Reviewers: haobo

Reviewed By: haobo

CC: leveldb, haobo

Differential Revision: https://reviews.facebook.net/D12987
  • Loading branch information
dhruba committed Sep 20, 2013
1 parent 4335418 commit 5e9f3a9
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 20 deletions.
57 changes: 55 additions & 2 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/statistics.h"
#include "port/port.h"
Expand Down Expand Up @@ -79,6 +80,7 @@ static const char* FLAGS_benchmarks =
"snappycomp,"
"snappyuncomp,"
"acquireload,"
"fillfromstdin,"
;
// the maximum size of key in bytes
static const int MAX_KEY_SIZE = 128;
Expand Down Expand Up @@ -906,6 +908,9 @@ class Benchmark {
} else if (name == Slice("fillrandom")) {
fresh_db = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("fillfromstdin")) {
fresh_db = true;
method = &Benchmark::WriteFromStdin;
} else if (name == Slice("filluniquerandom")) {
fresh_db = true;
if (num_threads > 1) {
Expand Down Expand Up @@ -1342,6 +1347,54 @@ class Benchmark {
DoWrite(thread, UNIQUE_RANDOM);
}

void writeOrFail(WriteBatch& batch) {
Status s = db_->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
}

void WriteFromStdin(ThreadState* thread) {
size_t count = 0;
WriteBatch batch;
const size_t bufferLen = 32 << 20;
unique_ptr<char[]> line = unique_ptr<char[]>(new char[bufferLen]);
char* linep = line.get();
const int batchSize = 100 << 10;
const char columnSeparator = '\t';
const char lineSeparator = '\n';

while (fgets(linep, bufferLen, stdin) != nullptr) {
++count;
char* tab = std::find(linep, linep + bufferLen, columnSeparator);
if (tab == linep + bufferLen) {
fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count);
continue;
}
Slice key(linep, tab - linep);
tab++;
char* endLine = std::find(tab, linep + bufferLen, lineSeparator);
if (endLine == linep + bufferLen) {
fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count);
continue;
}
Slice value(tab, endLine - tab);
thread->stats.FinishedSingleOp(db_);
thread->stats.AddBytes(endLine - linep - 1);

if (batch.Count() < batchSize) {
batch.Put(key, value);
continue;
}
writeOrFail(batch);
batch.Clear();
}
if (batch.Count() > 0) {
writeOrFail(batch);
}
}

void DoWrite(ThreadState* thread, WriteMode write_mode) {
const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
const int num_ops = writes_ == 0 ? num_ : writes_ ;
Expand Down Expand Up @@ -2320,8 +2373,8 @@ int main(int argc, char** argv) {
} else {
FLAGS_key_size = n;
}
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%lld%c", &ll, &junk) == 1) {
FLAGS_write_buffer_size = ll;
} else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) {
FLAGS_max_write_buffer_number = n;
} else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c",
Expand Down
65 changes: 47 additions & 18 deletions util/vectorrep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ class VectorRep : public MemTableRep {
virtual ~VectorRep() override { }

class Iterator : public MemTableRep::Iterator {
class VectorRep* vrep_;
std::shared_ptr<std::vector<const char*>> bucket_;
typename std::vector<const char*>::const_iterator cit_;
typename std::vector<const char*>::const_iterator mutable cit_;
const KeyComparator& compare_;
bool mutable sorted_;
void DoSort() const;
public:
explicit Iterator(std::shared_ptr<std::vector<const char*>> bucket,
explicit Iterator(class VectorRep* vrep,
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare);

// Initialize an iterator over the specified collection.
Expand Down Expand Up @@ -82,11 +86,12 @@ class VectorRep : public MemTableRep {
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;

private:
friend class Iterator;
typedef std::vector<const char*> Bucket;
std::shared_ptr<Bucket> bucket_;
mutable port::RWMutex rwlock_;
bool immutable_ = false;
bool sorted_ = false;
bool immutable_;
bool sorted_;
const KeyComparator& compare_;
};

Expand Down Expand Up @@ -119,16 +124,42 @@ size_t VectorRep::ApproximateMemoryUsage() {

VectorRep::VectorRep(const KeyComparator& compare, Arena* arena, size_t count)
: bucket_(new Bucket(count)),
immutable_(false),
sorted_(false),
compare_(compare) { }

VectorRep::Iterator::Iterator(std::shared_ptr<std::vector<const char*>> bucket,
VectorRep::Iterator::Iterator(class VectorRep* vrep,
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare)
: bucket_(bucket),
cit_(bucket_->begin()),
compare_(compare) { }
: vrep_(vrep),
bucket_(bucket),
cit_(nullptr),
compare_(compare),
sorted_(false) { }

void VectorRep::Iterator::DoSort() const {
// vrep is non-null means that we are working on an immutable memtable
if (!sorted_ && vrep_ != nullptr) {
WriteLock l(&vrep_->rwlock_);
if (!vrep_->sorted_) {
std::sort(bucket_->begin(), bucket_->end(), Compare(compare_));
cit_ = bucket_->begin();
vrep_->sorted_ = true;
}
sorted_ = true;
}
if (!sorted_) {
std::sort(bucket_->begin(), bucket_->end(), Compare(compare_));
cit_ = bucket_->begin();
sorted_ = true;
}
assert(sorted_);
assert(vrep_ == nullptr || vrep_->sorted_);
}

// Returns true iff the iterator is positioned at a valid node.
bool VectorRep::Iterator::Valid() const {
DoSort();
return cit_ != bucket_->end();
}

Expand Down Expand Up @@ -165,6 +196,7 @@ void VectorRep::Iterator::Prev() {

// Advance to the first entry with a key >= target
void VectorRep::Iterator::Seek(const char* target) {
DoSort();
// Do binary search to find first value not less than the target
cit_ = std::equal_range(bucket_->begin(),
bucket_->end(),
Expand All @@ -177,34 +209,31 @@ void VectorRep::Iterator::Seek(const char* target) {
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
void VectorRep::Iterator::SeekToFirst() {
DoSort();
cit_ = bucket_->begin();
}

// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
void VectorRep::Iterator::SeekToLast() {
DoSort();
cit_ = bucket_->end();
if (bucket_->size() != 0) {
--cit_;
}
}

std::shared_ptr<MemTableRep::Iterator> VectorRep::GetIterator() {
std::shared_ptr<Bucket> tmp;
ReadLock l(&rwlock_);
// Do not sort here. The sorting would be done the first time
// a Seek is performed on the iterator.
if (immutable_) {
rwlock_.Unlock();
rwlock_.WriteLock();
tmp = bucket_;
if (!sorted_) {
std::sort(tmp->begin(), tmp->end(), Compare(compare_));
sorted_ = true;
}
return std::make_shared<Iterator>(this, bucket_, compare_);
} else {
std::shared_ptr<Bucket> tmp;
tmp.reset(new Bucket(*bucket_)); // make a copy
std::sort(tmp->begin(), tmp->end(), Compare(compare_));
return std::make_shared<Iterator>(nullptr, tmp, compare_);
}
return std::make_shared<Iterator>(tmp, compare_);
}
} // anon namespace

Expand Down

0 comments on commit 5e9f3a9

Please sign in to comment.