Skip to content

Commit

Permalink
WriteUnPrepared: savepoint support (facebook#5627)
Browse files Browse the repository at this point in the history
Summary:
Add savepoint support when the current transaction has flushed unprepared batches.

Rolling back to savepoint is similar to rolling back a transaction. It requires the set of keys that have changed since the savepoint, re-reading the keys at the snapshot at that savepoint, and the restoring the old keys by writing out another unprepared batch.

For this strategy to work though, we must be capable of reading keys at a savepoint. This does not work if keys were written out using the same sequence number before and after a savepoint. Therefore, when we flush out unprepared batches, we must split the batch by savepoint if any savepoints exist.

eg. If we have the following:
```
Put(A)
Put(B)
Put(C)
SetSavePoint()
Put(D)
Put(E)
SetSavePoint()
Put(F)
```

Then we will write out 3 separate unprepared batches:
```
Put(A) 1
Put(B) 1
Put(C) 1
Put(D) 2
Put(E) 2
Put(F) 3
```

This is so that when we rollback to eg. the first savepoint, we can just read keys at snapshot_seq = 1.
Pull Request resolved: facebook#5627

Differential Revision: D16584130

Pulled By: lth

fbshipit-source-id: 6d100dd548fb20c4b76661bd0f8a2647e64477fa
  • Loading branch information
lth authored and facebook-github-bot committed Jul 31, 2019
1 parent d599135 commit f622ca2
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 29 deletions.
48 changes: 31 additions & 17 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,25 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
}

Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < WriteBatchInternal::kHeader) {
if (rep_.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}

input.remove_prefix(WriteBatchInternal::kHeader);
return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
rep_.size());
}

Status WriteBatchInternal::Iterate(const WriteBatch* wb,
WriteBatch::Handler* handler, size_t begin,
size_t end) {
if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
return Status::Corruption("Invalid start/end bounds for Iterate");
}
assert(begin <= end);
Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
bool whole_batch =
(begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());

Slice key, value, blob, xid;
// Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
// the batch boundary symbols otherwise we would mis-count the number of
Expand Down Expand Up @@ -547,7 +560,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
} else {
assert(s.IsTryAgain());
assert(!last_was_try_again); // to detect infinite loop bugs
assert(!last_was_try_again); // to detect infinite loop bugs
if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
Expand All @@ -560,7 +573,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (LIKELY(s.ok())) {
Expand All @@ -570,7 +583,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (LIKELY(s.ok())) {
Expand All @@ -580,7 +593,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (LIKELY(s.ok())) {
Expand All @@ -590,7 +603,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (LIKELY(s.ok())) {
Expand All @@ -600,7 +613,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (LIKELY(s.ok())) {
Expand All @@ -610,7 +623,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (LIKELY(s.ok())) {
Expand All @@ -623,7 +636,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
empty_batch = false;
break;
case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
empty_batch = false;
Expand All @@ -642,7 +655,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
break;
case kTypeBeginPersistedPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
empty_batch = false;
Expand All @@ -655,7 +668,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
break;
case kTypeBeginUnprepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
handler->MarkBeginPrepare(true /* unprepared */);
empty_batch = false;
Expand All @@ -674,19 +687,19 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
break;
case kTypeEndPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid);
empty_batch = true;
break;
case kTypeCommitXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid);
empty_batch = true;
break;
case kTypeRollbackXID:
assert(content_flags_.load(std::memory_order_relaxed) &
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid);
empty_batch = true;
Expand All @@ -702,7 +715,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!s.ok()) {
return s;
}
if (handler_continue && found != WriteBatchInternal::Count(this)) {
if (handler_continue && whole_batch &&
found != WriteBatchInternal::Count(wb)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
Expand Down
4 changes: 4 additions & 0 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class WriteBatchInternal {
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);

// Iterate over [begin, end) range of a write batch
static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
size_t begin, size_t end);

// This write batch includes the latest state that should be persisted. Such
// state meant to be used only during recovery.
static void SetAsLastestPersistentState(WriteBatch* b);
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class WriteBatchWithIndex : public WriteBatchBase {
size_t max_bytes = 0);

~WriteBatchWithIndex() override;
WriteBatchWithIndex(WriteBatchWithIndex&&);
WriteBatchWithIndex& operator=(WriteBatchWithIndex&&);

using WriteBatchBase::Put;
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class WriteBatch : public WriteBatchBase {
virtual bool Continue();

protected:
friend class WriteBatch;
friend class WriteBatchInternal;
virtual bool WriteAfterCommit() const { return true; }
virtual bool WriteBeforePrepare() const { return false; }
};
Expand Down
4 changes: 2 additions & 2 deletions utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
if (dbimpl_->allow_2pc()) {
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
InitWriteBatch();
}
}

Expand All @@ -49,7 +49,7 @@ void TransactionBaseImpl::Clear() {
num_merges_ = 0;

if (dbimpl_->allow_2pc()) {
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
InitWriteBatch();
}
}

Expand Down
20 changes: 16 additions & 4 deletions utilities/transactions/transaction_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <vector>

#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
Expand Down Expand Up @@ -273,6 +274,15 @@ class TransactionBaseImpl : public Transaction {
// Sets a snapshot if SetSnapshotOnNextOperation() has been called.
void SetSnapshotIfNeeded();

// Initialize write_batch_ for 2PC by inserting Noop.
inline void InitWriteBatch(bool clear = false) {
if (clear) {
write_batch_.Clear();
}
assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
}

DB* db_;
DBImpl* dbimpl_;

Expand Down Expand Up @@ -325,16 +335,18 @@ class TransactionBaseImpl : public Transaction {
// Optimistic Transactions will wait till commit time to do conflict checking.
TransactionKeyMap tracked_keys_;

// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
autovector<TransactionBaseImpl::SavePoint>>>
save_points_;

private:
friend class WritePreparedTxn;
// Extra data to be persisted with the commit. Note this is only used when
// prepare phase is not skipped.
WriteBatch commit_time_batch_;

// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>> save_points_;

// If true, future Put/Merge/Deletes will be indexed in the
// WriteBatchWithIndex.
// If false, future Put/Merge/Deletes will be inserted directly into the
Expand Down
Loading

0 comments on commit f622ca2

Please sign in to comment.