Skip to content

Commit

Permalink
WriteUnPrepared: Fix bug in savepoints (facebook#5703)
Browse files Browse the repository at this point in the history
Summary:
Fix a bug in write unprepared savepoints. When flushing the write batch according to savepoint boundaries, we were forgetting to flush the last write batch after the last savepoint, meaning that some data was not written to DB.

Also, add a small optimization where we avoid flushing empty batches.
Pull Request resolved: facebook#5703

Differential Revision: D16811996

Pulled By: lth

fbshipit-source-id: 600c7e0e520ad7a8fad32d77e11d932453e68e3f
  • Loading branch information
lth authored and facebook-github-bot committed Aug 14, 2019
1 parent 0a97125 commit 7785f61
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
20 changes: 20 additions & 0 deletions utilities/transactions/write_unprepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,26 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
}
}

TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
WriteOptions woptions;
TransactionOptions txn_options;
txn_options.write_batch_flush_threshold = 1;

Transaction* txn = db->BeginTransaction(woptions, txn_options);
txn->SetSavePoint();
ASSERT_OK(txn->Put("a", "a"));
ASSERT_OK(txn->Put("b", "b"));
ASSERT_OK(txn->Commit());

ReadOptions roptions;
std::string value;
ASSERT_OK(txn->Get(roptions, "a", &value));
ASSERT_EQ(value, "a");
ASSERT_OK(txn->Get(roptions, "b", &value));
ASSERT_EQ(value, "b");
delete txn;
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
30 changes: 18 additions & 12 deletions utilities/transactions/write_unprepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,13 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {

size_t prev_boundary = WriteBatchInternal::kHeader;
const bool kPrepared = true;
for (size_t i = 0; i < unflushed_save_points_->size(); i++) {
for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
bool trailing_batch = i == unflushed_save_points_->size();
SavePointBatchHandler sp_handler(&write_batch_,
*wupt_db_->GetCFHandleMap().get());
size_t curr_boundary = (*unflushed_save_points_)[i];
size_t curr_boundary = trailing_batch
? wb.GetWriteBatch()->GetDataSize()
: (*unflushed_save_points_)[i];

// Construct the partial write batch up to the savepoint.
//
Expand All @@ -424,18 +427,22 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
return s;
}

// Flush the write batch.
s = FlushWriteBatchToDBInternal(!kPrepared);
if (!s.ok()) {
return s;
if (write_batch_.GetWriteBatch()->Count() > 0) {
// Flush the write batch.
s = FlushWriteBatchToDBInternal(!kPrepared);
if (!s.ok()) {
return s;
}
}

if (flushed_save_points_ == nullptr) {
flushed_save_points_.reset(
new autovector<WriteUnpreparedTxn::SavePoint>());
if (!trailing_batch) {
if (flushed_save_points_ == nullptr) {
flushed_save_points_.reset(
new autovector<WriteUnpreparedTxn::SavePoint>());
}
flushed_save_points_->emplace_back(
unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
}
flushed_save_points_->emplace_back(
unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));

prev_boundary = curr_boundary;
const bool kClear = true;
Expand Down Expand Up @@ -736,7 +743,6 @@ Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
assert(flushed_save_points_->size() > 0);
WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();

assert(top.unprep_seqs_.size() > 0);
assert(save_points_ != nullptr && save_points_->size() > 0);
const TransactionKeyMap& tracked_keys = save_points_->top().new_keys_;

Expand Down

0 comments on commit 7785f61

Please sign in to comment.