Skip to content

Commit

Permalink
Avoid user key copying for Get/Put/Write with user-timestamp (faceboo…
Browse files Browse the repository at this point in the history
…k#5502)

Summary:
In previous facebook#5079, we added user-specified timestamp to `DB::Get()` and `DB::Put()`. Limitation is that these two functions may cause extra memory allocation and key copy. The reason is that `WriteBatch` does not allocate extra memory for timestamps because it is not aware of timestamp size, and we did not provide an API to assign/update timestamp of each key within a `WriteBatch`.
We address these issues in this PR by doing the following.
1. Add a `timestamp_size_` to `WriteBatch` so that `WriteBatch` can take timestamps into account when calling `WriteBatch::Put`, `WriteBatch::Delete`, etc.
2. Add APIs `WriteBatch::AssignTimestamp` and `WriteBatch::AssignTimestamps` so that application can assign/update timestamps for each key in a `WriteBatch`.
3. Avoid key copy in `GetImpl` by adding new constructor to `LookupKey`.

Test plan (on devserver):
```
$make clean && COMPILE_WITH_ASAN=1 make -j32 all
$./db_basic_test --gtest_filter=Timestamp/DBBasicTestWithTimestampWithParam.PutAndGet/*
$make check
```
If the API extension looks good, I will add more unit tests.

Some simple benchmark using db_bench.
```
$rm -rf /dev/shm/dbbench/* && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillseq,readrandom -num=1000000
$rm -rf /dev/shm/dbbench/* && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -num=1000000 -disable_wal=true
```
Master is at a78503b.
```
|        | readrandom | fillrandom |
| master | 15.53 MB/s | 25.97 MB/s |
| PR5502 | 16.70 MB/s | 25.80 MB/s |
```
Pull Request resolved: facebook#5502

Differential Revision: D16340894

Pulled By: riversand963

fbshipit-source-id: 51132cf792be07d1efc3ac33f5768c4ee2608bb8
  • Loading branch information
riversand963 authored and facebook-github-bot committed Jul 25, 2019
1 parent 0d16fad commit ae152ee
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ rocksdb_undump
db_test2
trace_analyzer
trace_analyzer_test
block_cache_trace_analyzer
.DS_Store

java/out
Expand Down
13 changes: 2 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,16 +1441,7 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
if (nullptr == read_options.timestamp) {
return GetImpl(read_options, column_family, key, value);
}
Slice akey;
std::string buf;
Status s = AppendTimestamp(key, *(read_options.timestamp), &akey, &buf);
if (s.ok()) {
s = GetImpl(read_options, column_family, akey, value);
}
return s;
return GetImpl(read_options, column_family, key, value);
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
Expand Down Expand Up @@ -1528,7 +1519,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);

bool skip_memtable = (read_options.read_tier == kPersistedTier &&
Expand Down
12 changes: 7 additions & 5 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,14 +1734,16 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
}
return Write(opt, &batch);
}
Slice akey;
std::string buf;
Status s = AppendTimestamp(key, *(opt.timestamp), &akey, &buf);
const Slice* ts = opt.timestamp;
assert(nullptr != ts);
size_t ts_sz = ts->size();
WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
ts_sz);
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
}
WriteBatch batch(akey.size() + value.size() + 24);
s = batch.Put(column_family, akey, value);
s = batch.AssignTimestamp(*ts);
if (!s.ok()) {
return s;
}
Expand Down
12 changes: 9 additions & 3 deletions db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
}
}

LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s,
const Slice* ts) {
size_t usize = _user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t ts_sz = (nullptr == ts) ? 0 : ts->size();
size_t needed = usize + ts_sz + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
Expand All @@ -170,10 +172,14 @@ LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
}
start_ = dst;
// NOTE: We don't support users keys of more than 2GB :)
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + ts_sz + 8));
kstart_ = dst;
memcpy(dst, _user_key.data(), usize);
dst += usize;
if (nullptr != ts) {
memcpy(dst, ts->data(), ts_sz);
dst += ts_sz;
}
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
Expand Down
16 changes: 0 additions & 16 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,20 +669,4 @@ struct ParsedInternalKeyComparator {
const InternalKeyComparator* cmp;
};

// TODO (yanqin): this causes extra memory allocation and copy. Should be
// addressed in the future.
inline Status AppendTimestamp(const Slice& key, const Slice& timestamp,
Slice* ret_key, std::string* ret_buf) {
assert(ret_key != nullptr);
assert(ret_buf != nullptr);
if (key.data() + key.size() == timestamp.data()) {
*ret_key = Slice(key.data(), key.size() + timestamp.size());
} else {
ret_buf->assign(key.data(), key.size());
ret_buf->append(timestamp.data(), timestamp.size());
*ret_key = Slice(*ret_buf);
}
return Status::OK();
}

} // namespace rocksdb
3 changes: 2 additions & 1 deletion db/lookup_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& _user_key, SequenceNumber sequence);
LookupKey(const Slice& _user_key, SequenceNumber sequence,
const Slice* ts = nullptr);

~LookupKey();

Expand Down
146 changes: 139 additions & 7 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,121 @@ struct BatchContentClassifier : public WriteBatch::Handler {
}
};

class TimestampAssigner : public WriteBatch::Handler {
public:
explicit TimestampAssigner(const Slice& ts)
: timestamp_(ts), timestamps_(kEmptyTimestampList) {}
explicit TimestampAssigner(const std::vector<Slice>& ts_list)
: timestamps_(ts_list) {
SanityCheck();
}
~TimestampAssigner() override {}

Status PutCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}

Status DeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}

Status SingleDeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}

Status DeleteRangeCF(uint32_t, const Slice& begin_key,
const Slice& end_key) override {
AssignTimestamp(begin_key);
AssignTimestamp(end_key);
++idx_;
return Status::OK();
}

Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}

Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
// TODO (yanqin): support blob db in the future.
return Status::OK();
}

Status MarkBeginPrepare(bool) override {
// TODO (yanqin): support in the future.
return Status::OK();
}

Status MarkEndPrepare(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}

Status MarkCommit(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}

Status MarkRollback(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}

private:
void SanityCheck() const {
assert(!timestamps_.empty());
#ifndef NDEBUG
const size_t ts_sz = timestamps_[0].size();
for (size_t i = 1; i != timestamps_.size(); ++i) {
assert(ts_sz == timestamps_[i].size());
}
#endif // !NDEBUG
}

void AssignTimestamp(const Slice& key) {
assert(timestamps_.empty() || idx_ < timestamps_.size());
const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
size_t ts_sz = ts.size();
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
memcpy(ptr, ts.data(), ts_sz);
}

static const std::vector<Slice> kEmptyTimestampList;
const Slice timestamp_;
const std::vector<Slice>& timestamps_;
size_t idx_ = 0;

// No copy or move.
TimestampAssigner(const TimestampAssigner&) = delete;
TimestampAssigner(TimestampAssigner&&) = delete;
TimestampAssigner& operator=(const TimestampAssigner&) = delete;
TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
};
const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;

} // anon namespace

struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack;
};

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_() {
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
Expand All @@ -151,18 +258,21 @@ WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
WriteBatch::WriteBatch(const std::string& rep)
: content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(rep) {}
rep_(rep),
timestamp_size_(0) {}

WriteBatch::WriteBatch(std::string&& rep)
: content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(std::move(rep)) {}
rep_(std::move(rep)),
timestamp_size_(0) {}

WriteBatch::WriteBatch(const WriteBatch& src)
: wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(src.rep_) {
rep_(src.rep_),
timestamp_size_(src.timestamp_size_) {
if (src.save_points_ != nullptr) {
save_points_.reset(new SavePoints());
save_points_->stack = src.save_points_->stack;
Expand All @@ -174,7 +284,8 @@ WriteBatch::WriteBatch(WriteBatch&& src) noexcept
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(std::move(src.rep_)) {}
rep_(std::move(src.rep_)),
timestamp_size_(src.timestamp_size_) {}

WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
Expand Down Expand Up @@ -643,7 +754,14 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(b->timestamp_size_, '\0');
}
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
Expand Down Expand Up @@ -692,7 +810,11 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
Expand Down Expand Up @@ -1038,6 +1160,16 @@ Status WriteBatch::PopSavePoint() {
return Status::OK();
}

Status WriteBatch::AssignTimestamp(const Slice& ts) {
TimestampAssigner ts_assigner(ts);
return Iterate(&ts_assigner);
}

Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
TimestampAssigner ts_assigner(ts_list);
return Iterate(&ts_assigner);
}

class MemTableInserter : public WriteBatch::Handler {

SequenceNumber sequence_;
Expand Down
9 changes: 9 additions & 0 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <atomic>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/status.h"
#include "rocksdb/write_batch_base.h"

Expand Down Expand Up @@ -60,6 +61,7 @@ struct SavePoint {
class WriteBatch : public WriteBatchBase {
public:
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz);
~WriteBatch() override;

using WriteBatchBase::Put;
Expand Down Expand Up @@ -311,6 +313,12 @@ class WriteBatch : public WriteBatchBase {
// Returns trie if MarkRollback will be called during Iterate
bool HasRollback() const;

// Assign timestamp to write batch
Status AssignTimestamp(const Slice& ts);

// Assign timestamps to write batch
Status AssignTimestamps(const std::vector<Slice>& ts_list);

using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override { return this; }

Expand Down Expand Up @@ -361,6 +369,7 @@ class WriteBatch : public WriteBatchBase {

protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_
const size_t timestamp_size_;

// Intentionally copyable
};
Expand Down
16 changes: 14 additions & 2 deletions util/coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ extern void PutVarint32Varint32Varint64(std::string* dst, uint32_t value1,
extern void PutLengthPrefixedSlice(std::string* dst, const Slice& value);
extern void PutLengthPrefixedSliceParts(std::string* dst,
const SliceParts& slice_parts);
extern void PutLengthPrefixedSlicePartsWithPadding(
std::string* dst, const SliceParts& slice_parts, size_t pad_sz);

// Standard Get... routines parse a value from the beginning of a Slice
// and advance the slice past the parsed value.
Expand Down Expand Up @@ -306,9 +308,8 @@ inline void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
dst->append(value.data(), value.size());
}

inline void PutLengthPrefixedSliceParts(std::string* dst,
inline void PutLengthPrefixedSliceParts(std::string* dst, size_t total_bytes,
const SliceParts& slice_parts) {
size_t total_bytes = 0;
for (int i = 0; i < slice_parts.num_parts; ++i) {
total_bytes += slice_parts.parts[i].size();
}
Expand All @@ -318,6 +319,17 @@ inline void PutLengthPrefixedSliceParts(std::string* dst,
}
}

inline void PutLengthPrefixedSliceParts(std::string* dst,
const SliceParts& slice_parts) {
PutLengthPrefixedSliceParts(dst, /*total_bytes=*/0, slice_parts);
}

inline void PutLengthPrefixedSlicePartsWithPadding(
std::string* dst, const SliceParts& slice_parts, size_t pad_sz) {
PutLengthPrefixedSliceParts(dst, /*total_bytes=*/pad_sz, slice_parts);
dst->append(pad_sz, '\0');
}

inline int VarintLength(uint64_t v) {
int len = 1;
while (v >= 128) {
Expand Down

0 comments on commit ae152ee

Please sign in to comment.