Skip to content

Commit

Permalink
[RocksDB] Fix a race condition in GetSortedWalFiles
Browse files Browse the repository at this point in the history
Summary: This patch fixed a race condition where a log file is moved to archived dir in the middle of GetSortedWalFiles. Without the fix, the log file would be missed in the result, which leads to transaction log iterator gap. A test utility SyncPoint is added to help reproducing the race condition.

Test Plan: TransactionLogIteratorRace; make check

Reviewers: dhruba, ljin

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17121
  • Loading branch information
haoboxu committed Apr 3, 2014
1 parent d1d19f5 commit 48bc0c6
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 19 deletions.
48 changes: 42 additions & 6 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "rocksdb/env.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"

namespace rocksdb {

Expand Down Expand Up @@ -95,20 +96,55 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
}

Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in archive dir, then append sorted files from main
// dir to maintain sorted order
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}

// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1");
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2");

files.clear();
// list wal files in archive dir.
Status s;
std::string archivedir = ArchivalDirectory(options_.wal_dir);
if (env_->FileExists(archivedir)) {
s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
// list wal files in main db dir.
return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile);

uint64_t latest_archived_log_number = 0;
if (!files.empty()) {
latest_archived_log_number = files.back()->LogNumber();
Log(options_.info_log, "Latest Archived log: %lu",
latest_archived_log_number);
}

files.reserve(files.size() + logs.size());
for (auto& log : logs) {
if (log->LogNumber() > latest_archived_log_number) {
files.push_back(std::move(log));
} else {
// When the race condition happens, we could see the
// same log in both db dir and archived dir. Simply
// ignore the one in db dir. Note that, if we read
// archived dir first, we would have missed the log file.
Log(options_.info_log, "%s already moved to archive",
log->PathName().c_str());
}
}

return s;
}

}
19 changes: 9 additions & 10 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"

namespace rocksdb {

Expand Down Expand Up @@ -872,7 +873,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (type == kLogFile &&
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2");
Log(options_.info_log,
"Move log file %s to %s -- %s\n",
fname.c_str(), archived_log_name.c_str(), s.ToString().c_str());
Expand Down Expand Up @@ -1020,7 +1025,7 @@ void DBImpl::PurgeObsoleteWALFiles() {

size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs;
AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);

if (files_del_num > archived_logs.size()) {
Log(options_.info_log, "Trying to delete more archived log files than "
Expand Down Expand Up @@ -1791,20 +1796,14 @@ struct CompareLogByPointer {
}
};

Status DBImpl::AppendSortedWalsOfType(const std::string& path,
Status DBImpl::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files, WalFileType log_type) {
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) {
return status;
}
log_files.reserve(log_files.size() + all_files.size());
VectorLogPtr::iterator pos_start;
if (!log_files.empty()) {
pos_start = log_files.end() - 1;
} else {
pos_start = log_files.begin();
}
log_files.reserve(all_files.size());
for (const auto& f : all_files) {
uint64_t number;
FileType type;
Expand All @@ -1830,7 +1829,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
}
}
CompareLogByPointer compare_log_files;
std::sort(pos_start, log_files.end(), compare_log_files);
std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status;
}

Expand Down
6 changes: 3 additions & 3 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ class DBImpl : public DB {

void PurgeObsoleteWALFiles();

Status AppendSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType type);
Status GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType type);

// Requires: all_logs should be sorted with earliest log file first
// Retains all log files in all_logs which contain updates with seq no.
Expand Down
46 changes: 46 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "util/mutexlock.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/sync_point.h"
#include "util/testutil.h"

namespace rocksdb {
Expand Down Expand Up @@ -5189,6 +5190,51 @@ TEST(DBTest, TransactionLogIterator) {
} while (ChangeCompactOptions());
}

TEST(DBTest, TransactionLogIteratorRace) {
// Setup sync point dependency to reproduce the race condition of
// a log file moved to archived dir, in the middle of GetSortedWalFiles
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{ { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" },
{ "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" },
});

do {
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key2", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key3", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key4", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);

{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(4, iter);
}

rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// trigger async flush, and log move. Well, log move will
// wait until the GetSortedWalFiles:1 to reproduce the race
// condition
FlushOptions flush_options;
flush_options.wait = false;
dbfull()->Flush(flush_options);

// "key5" would be written in a new memtable and log
Put("key5", DummyString(1024));
{
// this iter would miss "key4" if not fixed
auto iter = OpenTransactionLogIter(0);
ExpectRecords(5, iter);
}
} while (ChangeCompactOptions());
}

TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
do {
Options options = OptionsForLogIterTest();
Expand Down
62 changes: 62 additions & 0 deletions util/sync_point.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "util/sync_point.h"

namespace rocksdb {

SyncPoint* SyncPoint::GetInstance() {
static SyncPoint sync_point;
return &sync_point;
}

void SyncPoint::LoadDependency(const std::vector<Dependency>& dependencies) {
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
}

bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
for (const auto& pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
}

void SyncPoint::EnableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = true;
}

void SyncPoint::DisableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = false;
}

void SyncPoint::ClearTrace() {
std::unique_lock<std::mutex> lock(mutex_);
cleared_points_.clear();
}

void SyncPoint::Process(const std::string& point) {
std::unique_lock<std::mutex> lock(mutex_);

if (!enabled_) return;

while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
}

cleared_points_.insert(point);
cv_.notify_all();
}

} // namespace rocksdb
79 changes: 79 additions & 0 deletions util/sync_point.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once

#include <condition_variable>
#include <mutex>
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <vector>

namespace rocksdb {

// This class provides facility to reproduce race conditions deterministically
// in unit tests.
// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
// Each sync point represents a position in the execution stream of a thread.
// In the unit test, 'Happens After' relationship among sync points could be
// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of
// threads execution.
// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case.

class SyncPoint {
public:
static SyncPoint* GetInstance();

struct Dependency {
std::string predecessor;
std::string successor;
};
// call once at the beginning of a test to setup the dependency between
// sync points
void LoadDependency(const std::vector<Dependency>& dependencies);

// enable sync point processing (disabled on startup)
void EnableProcessing();

// disable sync point processing
void DisableProcessing();

// remove the execution trace of all sync points
void ClearTrace();

// triggered by TEST_SYNC_POINT, blocking execution until all predecessors
// are executed.
void Process(const std::string& point);

// TODO: it might be useful to provide a function that blocks until all
// sync points are cleared.

private:
bool PredecessorsAllCleared(const std::string& point);

// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;

std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
bool enabled_ = false;
};

} // namespace rocksdb

// Use TEST_SYNC_POINT to specify sync points inside code base.
// Sync points can have happens-after depedency on other sync points,
// configured at runtime via SyncPoint::LoadDependency. This could be
// utilized to re-produce race conditions between threads.
// See TransactionLogIteratorRace in db_test.cc for an example use case.
// TEST_SYNC_POINT is no op in release build.
#ifdef NDEBUG
#define TEST_SYNC_POINT(x)
#else
#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x)
#endif

0 comments on commit 48bc0c6

Please sign in to comment.