Skip to content

Commit

Permalink
WritePrepared Txn: Fix bug with duplicate keys during recovery
Browse files Browse the repository at this point in the history
Summary:
Fix the following bugs:
- During recovery a duplicate key was inserted twice into the write batch of the recovery transaction,
once when the memtable returns false (because it was duplicates) and once for the 2nd attempt. This would result into different SubBatch count measured when the recovered transactions is committing.
- If a cf is flushed during recovery the memtable is not available to assist in detecting the duplicate key. This could result into not advancing the sequence number when iterating over duplicate keys of a flushed cf and hence inserting the next key with the wrong sequence number.
- SubBacthCounter would reset the comparator to default comparator after the first duplicate key. The 2nd duplicate key hence would have gone through a wrong comparator and not being detected.
Closes facebook#3562

Differential Revision: D7149440

Pulled By: maysamyabandeh

fbshipit-source-id: 91ec317b165f363f5d11ff8b8c47c81cebb8ed77
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Mar 5, 2018
1 parent 15f55e5 commit 680864a
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 74 deletions.
18 changes: 14 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,18 @@ class DBImpl : public DB {
WriteBatch* batch_;
// The seq number of the first key in the batch
SequenceNumber seq_;
// Number of sub-batched. A new sub-batch is created if we txn attempts to
// inserts a duplicate key,seq to memtable. This is currently used in
// WritePrparedTxn
size_t batch_cnt_;
explicit RecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch, SequenceNumber seq)
: log_number_(log), name_(name), batch_(batch), seq_(seq) {}
WriteBatch* batch, SequenceNumber seq,
size_t batch_cnt)
: log_number_(log),
name_(name),
batch_(batch),
seq_(seq),
batch_cnt_(batch_cnt) {}

~RecoveredTransaction() { delete batch_; }
};
Expand All @@ -574,9 +583,10 @@ class DBImpl : public DB {
}

void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch, SequenceNumber seq) {
WriteBatch* batch, SequenceNumber seq,
size_t batch_cnt) {
recovered_transactions_[name] =
new RecoveredTransaction(log, name, batch, seq);
new RecoveredTransaction(log, name, batch, seq, batch_cnt);
MarkLogAsContainingPrepSection(log);
}

Expand Down
217 changes: 182 additions & 35 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,60 @@ Status WriteBatch::PopSavePoint() {
return Status::OK();
}

// TODO(myabandeh): move it to util
namespace {
// During recovery if the memtable is flushed we cannot rely on its help on
// duplicate key detection and as key insert will not be attempted. This class
// will be used as a emulator of memtable to tell if insertion of a key/seq
// would have resulted in duplication.
class DuplicateDetector {
public:
explicit DuplicateDetector(DBImpl* db) : db_(db) {}
bool IsDuplicateKeySeq(uint32_t cf, const Slice& key, SequenceNumber seq) {
assert(seq >= batch_seq_);
if (batch_seq_ != seq) { // it is a new batch
keys_.clear();
}
batch_seq_ = seq;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
InitWithComp(cf);
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
keys_.clear();
InitWithComp(cf);
keys_[cf].insert(key);
return true;
}
return false;
}

private:
SequenceNumber batch_seq_ = 0;
DBImpl* db_;
// A comparator to be used in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}

private:
const Comparator* user_comparator_;
};
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
void InitWithComp(const uint32_t cf) {
auto cmp = db_->GetColumnFamilyHandle(cf)->GetComparator();
keys_[cf] = CFKeys(SetComparator(cmp));
}
};
} // anonymous namespace

class MemTableInserter : public WriteBatch::Handler {

SequenceNumber sequence_;
Expand Down Expand Up @@ -1008,6 +1062,7 @@ class MemTableInserter : public WriteBatch::Handler {
bool seq_per_batch_;
// Whether the memtable write will be done only after the commit
bool write_after_commit_;
DuplicateDetector duplicate_detector_;

MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
Expand Down Expand Up @@ -1045,7 +1100,8 @@ class MemTableInserter : public WriteBatch::Handler {
// Write after commit currently uses one seq per key (instead of per
// batch). So seq_per_batch being false indicates write_after_commit
// approach.
write_after_commit_(!seq_per_batch) {
write_after_commit_(!seq_per_batch),
duplicate_detector_(db_) {
assert(cf_mems_);
}

Expand Down Expand Up @@ -1135,17 +1191,25 @@ class MemTableInserter : public WriteBatch::Handler {

Status PutCFImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType value_type) {
if (rebuilding_trx_ != nullptr) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
if (write_after_commit_) {
return Status::OK();
}
return Status::OK();
// else insert the values to the memtable right away
}

Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
MaybeAdvanceSeq();
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probabely flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id,
key, sequence_);
}
MaybeAdvanceSeq(batch_boundry);
return seek_status;
}
Status ret_status;
Expand Down Expand Up @@ -1215,6 +1279,13 @@ class MemTableInserter : public WriteBatch::Handler {
}
}
}
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the the rebuilding transaction object.
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
}
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
Expand Down Expand Up @@ -1248,57 +1319,102 @@ class MemTableInserter : public WriteBatch::Handler {

virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
if (write_after_commit_) {
return Status::OK();
}
return Status::OK();
// else insert the values to the memtable right away
}

Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
MaybeAdvanceSeq();
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probabely flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id,
key, sequence_);
}
MaybeAdvanceSeq(batch_boundry);
return seek_status;
}

return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the the rebuilding transaction object.
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
return ret_status;
}

virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
if (write_after_commit_) {
return Status::OK();
}
return Status::OK();
// else insert the values to the memtable right away
}

Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
MaybeAdvanceSeq();
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probabely flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
key);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id,
key, sequence_);
}
MaybeAdvanceSeq(batch_boundry);
return seek_status;
}

return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
auto ret_status =
DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the the rebuilding transaction object.
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
}
return ret_status;
}

virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) override {
if (rebuilding_trx_ != nullptr) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
if (write_after_commit_) {
return Status::OK();
}
return Status::OK();
// else insert the values to the memtable right away
}

Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
MaybeAdvanceSeq();
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probabely flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
// TODO(myabandeh): when transctional DeleteRange support is added,
// check if end_key must also be added.
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(
column_family_id, begin_key, sequence_);
}
MaybeAdvanceSeq(batch_boundry);
return seek_status;
}
if (db_ != nullptr) {
Expand All @@ -1315,23 +1431,42 @@ class MemTableInserter : public WriteBatch::Handler {
}
}

return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
auto ret_status =
DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the the rebuilding transaction object.
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
}
return ret_status;
}

virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
assert(!concurrent_memtable_writes_);
if (rebuilding_trx_ != nullptr) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
if (write_after_commit_) {
return Status::OK();
}
return Status::OK();
// else insert the values to the memtable right away
}

Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
MaybeAdvanceSeq();
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probabely flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
value);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id,
key, sequence_);
}
MaybeAdvanceSeq(batch_boundry);
return seek_status;
}

Expand Down Expand Up @@ -1412,6 +1547,13 @@ class MemTableInserter : public WriteBatch::Handler {
}
}

// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the the rebuilding transaction object.
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
}
MaybeAdvanceSeq();
CheckMemtableFull();
return ret_status;
Expand Down Expand Up @@ -1466,8 +1608,13 @@ class MemTableInserter : public WriteBatch::Handler {

if (recovering_log_number_ != 0) {
assert(db_->allow_2pc());
size_t batch_cnt =
write_after_commit_
? 0 // 0 will disable further checks
: static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_, rebuilding_trx_seq_);
rebuilding_trx_, rebuilding_trx_seq_,
batch_cnt);
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
void SetMaxBytes(size_t max_bytes) override;

private:
friend class PessimisticTransactionDB;
friend class WritePreparedTxn;
friend class WriteBatchWithIndex_SubBatchCnt_Test;
// Returns the number of sub-batches inside the write batch. A sub-batch
Expand Down
5 changes: 5 additions & 0 deletions utilities/transactions/pessimistic_transaction_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ Status PessimisticTransactionDB::Initialize(
}

s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_);
// WriteCommitted set this to to disable this check that is specific to
// WritePrepared txns
assert(recovered_trx->batch_cnt_ == 0 ||
real_trx->GetWriteBatch()->SubBatchCnt() ==
recovered_trx->batch_cnt_);
real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) {
break;
Expand Down
1 change: 1 addition & 0 deletions utilities/transactions/pessimistic_transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class PessimisticTransactionDB : public TransactionDB {
friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock;
friend class TransactionTest_DoubleEmptyWrite_Test;
friend class TransactionTest_DuplicateKeys_Test;
friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
friend class TransactionTest_TwoPhaseLongPrepareTest_Test;
friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
Expand Down
Loading

0 comments on commit 680864a

Please sign in to comment.