Skip to content

Commit

Permalink
plain table reader: non-mmap mode to keep two recent buffers
Browse files Browse the repository at this point in the history
Summary: In plain table reader's non-mmap mode, we only keep the most recent read buffer. However, for binary search, it is likely we come back to a location to read. To avoid one pread in such a case, we keep two read buffers. It should cover most of the cases.

Test Plan:
1. run tests
2. check the optimization works through strace when running
./table_reader_bench -mmap_read=false --num_keys2=1 -num_keys1=5000 -table_factory=plain_table --iterator --through_db

Reviewers: anthony, rven, kradhakrishnan, igor, yhchiang, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D51171
  • Loading branch information
siying committed Jan 8, 2016
1 parent 7ece10e commit 9a8e3f7
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 78 deletions.
54 changes: 54 additions & 0 deletions db/plain_table_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "table/bloom_block.h"
#include "table/table_builder.h"
#include "table/plain_table_factory.h"
#include "table/plain_table_key_coding.h"
#include "table/plain_table_reader.h"
#include "util/hash.h"
#include "util/logging.h"
Expand All @@ -41,6 +42,59 @@
using std::unique_ptr;

namespace rocksdb {
class PlainTableKeyDecoderTest : public testing::Test {};

TEST_F(PlainTableKeyDecoderTest, ReadNonMmap) {
std::string tmp;
Random rnd(301);
const uint32_t kLength = 2222;
Slice contents = test::RandomString(&rnd, kLength, &tmp);
test::StringSource* string_source =
new test::StringSource(contents, 0, false);

unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(string_source));
unique_ptr<PlainTableReaderFileInfo> file_info(new PlainTableReaderFileInfo(
std::move(file_reader), EnvOptions(), kLength));

{
PlainTableFileReader reader(file_info.get());

const uint32_t kReadSize = 77;
for (uint32_t pos = 0; pos < kLength; pos += kReadSize) {
uint32_t read_size = std::min(kLength - pos, kReadSize);
Slice out;
ASSERT_TRUE(reader.Read(pos, read_size, &out));
ASSERT_EQ(0, out.compare(tmp.substr(pos, read_size)));
}

ASSERT_LT(string_source->total_reads(), kLength / kReadSize / 2);
}

std::vector<std::vector<std::pair<uint32_t, uint32_t>>> reads = {
{{600, 30}, {590, 30}, {600, 20}, {600, 40}},
{{800, 20}, {100, 20}, {500, 20}, {1500, 20}, {100, 20}, {80, 20}},
{{1000, 20}, {500, 20}, {1000, 50}},
{{1000, 20}, {500, 20}, {500, 20}},
{{1000, 20}, {500, 20}, {200, 20}, {500, 20}},
{{1000, 20}, {500, 20}, {200, 20}, {1000, 50}},
{{600, 500}, {610, 20}, {100, 20}},
{{500, 100}, {490, 100}, {550, 50}},
};

std::vector<int> num_file_reads = {2, 6, 2, 2, 4, 3, 2, 2};

for (size_t i = 0; i < reads.size(); i++) {
string_source->set_total_reads(0);
PlainTableFileReader reader(file_info.get());
for (auto p : reads[i]) {
Slice out;
ASSERT_TRUE(reader.Read(p.first, p.second, &out));
ASSERT_EQ(0, out.compare(tmp.substr(p.first, p.second)));
}
ASSERT_EQ(num_file_reads[i], string_source->total_reads());
}
}

class PlainTableDBTest : public testing::Test,
public testing::WithParamInterface<bool> {
Expand Down
93 changes: 54 additions & 39 deletions table/plain_table_key_coding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,47 +164,62 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
return Status::OK();
}

inline bool PlainTableKeyDecoder::FileReader::Read(uint32_t file_offset,
uint32_t len, Slice* out) {
if (file_info_->is_mmap_mode) {
assert(file_offset + len <= file_info_->data_end_offset);
*out = Slice(file_info_->file_data.data() + file_offset, len);
return true;
} else {
return ReadNonMmap(file_offset, len, out);
}
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
uint32_t len) {
assert(file_offset + len <= file_info_->data_end_offset);
return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
len);
}

bool PlainTableKeyDecoder::FileReader::ReadNonMmap(uint32_t file_offset,
uint32_t len, Slice* out) {
bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
Slice* out) {
const uint32_t kPrefetchSize = 256u;
if (file_offset < buf_start_offset_ ||
file_offset + len > buf_start_offset_ + buf_len_) {
// Load buffer
assert(file_offset + len <= file_info_->data_end_offset);
uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
std::max(kPrefetchSize, len));
if (size_to_read > buf_capacity_) {
buf_.reset(new char[size_to_read]);
buf_capacity_ = size_to_read;
buf_len_ = 0;
}
Slice read_result;
Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
buf_.get());
if (!s.ok()) {
status_ = s;
return false;

// Try to read from buffers.
for (uint32_t i = 0; i < num_buf_; i++) {
Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
if (file_offset >= buffer->buf_start_offset &&
file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
*out = GetFromBuffer(buffer, file_offset, len);
return true;
}
buf_start_offset_ = file_offset;
buf_len_ = size_to_read;
}
*out = Slice(buf_.get() + (file_offset - buf_start_offset_), len);

Buffer* new_buffer;
// Data needed is not in any of the buffer. Allocate a new buffer.
if (num_buf_ < buffers_.size()) {
// Add a new buffer
new_buffer = new Buffer();
buffers_[num_buf_++].reset(new_buffer);
} else {
// Now simply replace the last buffer. Can improve the placement policy
// if needed.
new_buffer = buffers_[num_buf_ - 1].get();
}

assert(file_offset + len <= file_info_->data_end_offset);
uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
std::max(kPrefetchSize, len));
if (size_to_read > new_buffer->buf_capacity) {
new_buffer->buf.reset(new char[size_to_read]);
new_buffer->buf_capacity = size_to_read;
new_buffer->buf_len = 0;
}
Slice read_result;
Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
new_buffer->buf.get());
if (!s.ok()) {
status_ = s;
return false;
}
new_buffer->buf_start_offset = file_offset;
new_buffer->buf_len = size_to_read;
*out = GetFromBuffer(new_buffer, file_offset, len);
return true;
}

inline bool PlainTableKeyDecoder::FileReader::ReadVarint32(
uint32_t offset, uint32_t* out, uint32_t* bytes_read) {
inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
if (file_info_->is_mmap_mode) {
const char* start = file_info_->file_data.data() + offset;
const char* limit =
Expand All @@ -218,8 +233,8 @@ inline bool PlainTableKeyDecoder::FileReader::ReadVarint32(
}
}

bool PlainTableKeyDecoder::FileReader::ReadVarint32NonMmap(
uint32_t offset, uint32_t* out, uint32_t* bytes_read) {
bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
const char* start;
const char* limit;
const uint32_t kMaxVarInt32Size = 6u;
Expand Down Expand Up @@ -298,7 +313,7 @@ Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info_->is_mmap_mode) {
if (!file_reader_.file_info()->is_mmap_mode) {
cur_key_.SetInternalKey(*parsed_key);
parsed_key->user_key = Slice(cur_key_.GetKey().data(), user_key_size);
if (internal_key != nullptr) {
Expand Down Expand Up @@ -348,14 +363,14 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey(
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info_->is_mmap_mode ||
if (!file_reader_.file_info()->is_mmap_mode ||
(internal_key != nullptr && !decoded_internal_key_valid)) {
// In non-mmap mode, always need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
cur_key_.SetInternalKey(*parsed_key);
saved_user_key_ = cur_key_.GetKey();
if (!file_reader_.file_info_->is_mmap_mode) {
if (!file_reader_.file_info()->is_mmap_mode) {
parsed_key->user_key = Slice(cur_key_.GetKey().data(), size);
}
if (internal_key != nullptr) {
Expand Down Expand Up @@ -394,7 +409,7 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey(
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info_->is_mmap_mode) {
if (!file_reader_.file_info()->is_mmap_mode) {
// In non-mmap mode, we need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
Expand Down
107 changes: 70 additions & 37 deletions table/plain_table_key_coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "rocksdb/slice.h"
#include "db/dbformat.h"
#include "table/plain_table_reader.h"

namespace rocksdb {

Expand Down Expand Up @@ -51,6 +52,74 @@ class PlainTableKeyEncoder {
IterKey pre_prefix_;
};

class PlainTableFileReader {
public:
explicit PlainTableFileReader(const PlainTableReaderFileInfo* _file_info)
: file_info_(_file_info), num_buf_(0) {}
// In mmaped mode, the results point to mmaped area of the file, which
// means it is always valid before closing the file.
// In non-mmap mode, the results point to an internal buffer. If the caller
// makes another read call, the results may not be valid. So callers should
// make a copy when needed.
// In order to save read calls to files, we keep two internal buffers:
// the first read and the most recent read. This is efficient because it
// columns these two common use cases:
// (1) hash index only identify one location, we read the key to verify
// the location, and read key and value if it is the right location.
// (2) after hash index checking, we identify two locations (because of
// hash bucket conflicts), we binary search the two location to see
// which one is what we need and start to read from the location.
// These two most common use cases will be covered by the two buffers
// so that we don't need to re-read the same location.
// Currently we keep a fixed size buffer. If a read doesn't exactly fit
// the buffer, we replace the second buffer with the location user reads.
//
// If return false, status code is stored in status_.
bool Read(uint32_t file_offset, uint32_t len, Slice* out) {
if (file_info_->is_mmap_mode) {
assert(file_offset + len <= file_info_->data_end_offset);
*out = Slice(file_info_->file_data.data() + file_offset, len);
return true;
} else {
return ReadNonMmap(file_offset, len, out);
}
}

// If return false, status code is stored in status_.
bool ReadNonMmap(uint32_t file_offset, uint32_t len, Slice* output);

// *bytes_read = 0 means eof. false means failure and status is saved
// in status_. Not directly returning Status to save copying status
// object to map previous performance of mmap mode.
inline bool ReadVarint32(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);

bool ReadVarint32NonMmap(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);

Status status() const { return status_; }

const PlainTableReaderFileInfo* file_info() { return file_info_; }

private:
const PlainTableReaderFileInfo* file_info_;

struct Buffer {
Buffer() : buf_start_offset(0), buf_len(0), buf_capacity(0) {}
std::unique_ptr<char[]> buf;
uint32_t buf_start_offset;
uint32_t buf_len;
uint32_t buf_capacity;
};

// Keep buffers for two recent reads.
std::array<unique_ptr<Buffer>, 2> buffers_;
uint32_t num_buf_;
Status status_;

Slice GetFromBuffer(Buffer* buf, uint32_t file_offset, uint32_t len);
};

// A helper class to decode keys from input buffer
// Actual data format of the key is documented in plain_table_factory.h
class PlainTableKeyDecoder {
Expand Down Expand Up @@ -82,43 +151,7 @@ class PlainTableKeyDecoder {
Slice* internal_key, uint32_t* bytes_read,
bool* seekable = nullptr);

class FileReader {
public:
explicit FileReader(const PlainTableReaderFileInfo* file_info)
: file_info_(file_info),
buf_start_offset_(0),
buf_len_(0),
buf_capacity_(0) {}
// In mmaped mode, the results point to mmaped area of the file, which
// means it is always valid before closing the file.
// In non-mmap mode, the results point to an internal buffer. If the caller
// makes another read call, the results will not be valid. So callers should
// make a copy when needed.
// If return false, status code is stored in status_.
inline bool Read(uint32_t file_offset, uint32_t len, Slice* output);

// If return false, status code is stored in status_.
bool ReadNonMmap(uint32_t file_offset, uint32_t len, Slice* output);

// *bytes_read = 0 means eof. false means failure and status is saved
// in status_. Not directly returning Status to save copying status
// object to map previous performance of mmap mode.
inline bool ReadVarint32(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);

bool ReadVarint32NonMmap(uint32_t offset, uint32_t* output,
uint32_t* bytes_read);

Status status() const { return status_; }

const PlainTableReaderFileInfo* file_info_;
std::unique_ptr<char[]> buf_;
uint32_t buf_start_offset_;
uint32_t buf_len_;
uint32_t buf_capacity_;
Status status_;
};
FileReader file_reader_;
PlainTableFileReader file_reader_;
EncodingType encoding_type_;
uint32_t prefix_len_;
uint32_t fixed_user_key_len_;
Expand Down
3 changes: 2 additions & 1 deletion table/table_reader_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
}
// verify key;
total_time += Now(env, measured_by_nanosecond) - start_time;
assert(Slice(MakeKey(r1, r2 + count, through_db)) == iter->key());
assert(Slice(MakeKey(r1, r2 + count, through_db)) ==
(through_db ? iter->key() : iiter->key()));
start_time = Now(env, measured_by_nanosecond);
if (++count >= r2_len) {
break;
Expand Down
9 changes: 8 additions & 1 deletion util/testutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,16 @@ class StringSource: public RandomAccessFile {
bool mmap = false)
: contents_(contents.data(), contents.size()),
uniq_id_(uniq_id),
mmap_(mmap) {}
mmap_(mmap),
total_reads_(0) {}

virtual ~StringSource() { }

uint64_t Size() const { return contents_.size(); }

virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
total_reads_++;
if (offset > contents_.size()) {
return Status::InvalidArgument("invalid Read offset");
}
Expand All @@ -271,10 +273,15 @@ class StringSource: public RandomAccessFile {
return static_cast<size_t>(rid-id);
}

int total_reads() const { return total_reads_; }

void set_total_reads(int tr) { total_reads_ = tr; }

private:
std::string contents_;
uint64_t uniq_id_;
bool mmap_;
mutable int total_reads_;
};

class NullLogger : public Logger {
Expand Down

0 comments on commit 9a8e3f7

Please sign in to comment.