Skip to content

Commit

Permalink
Abort compactions more reliably when closing DB
Browse files Browse the repository at this point in the history
Summary:
DB shutdown aborts running compactions by setting an atomic shutting_down=true that CompactionJob periodically checks. Without this PR it checks it before processing every _output_ value. If compaction filter filters everything out, the compaction is uninterruptible. This PR adds checks for shutting_down on every _input_ value (in CompactionIterator and MergeHelper).

There's also some minor code cleanup along the way.
Closes facebook#1639

Differential Revision: D4306571

Pulled By: yiwu-arbug

fbshipit-source-id: f050890
  • Loading branch information
al13n321 authored and facebook-github-bot committed Jan 11, 2017
1 parent 62384eb commit d18dd2c
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 63 deletions.
28 changes: 18 additions & 10 deletions db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@ CompactionIterator::CompactionIterator(
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
LogBuffer* log_buffer)
const std::atomic<bool>* shutting_down)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, env, expect_valid_internal_key,
range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, log_buffer) {}
compaction_filter, shutting_down) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
Expand All @@ -42,7 +43,7 @@ CompactionIterator::CompactionIterator(
range_del_agg_(range_del_agg),
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
log_buffer_(log_buffer),
shutting_down_(shutting_down),
merge_out_iter_(merge_helper_) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
bottommost_level_ =
Expand Down Expand Up @@ -136,7 +137,7 @@ void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;

while (!valid_ && input_->Valid()) {
while (!valid_ && input_->Valid() && !IsShuttingDown()) {
key_ = input_->key();
value_ = input_->value();
iter_stats_.num_input_records++;
Expand Down Expand Up @@ -217,7 +218,8 @@ void CompactionIterator::NextFromInput() {
}

if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
Expand Down Expand Up @@ -422,7 +424,6 @@ void CompactionIterator::NextFromInput() {
input_->Next();
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
status_ = Status::InvalidArgument(
"merge_operator is not properly initialized.");
return;
Expand All @@ -433,11 +434,14 @@ void CompactionIterator::NextFromInput() {
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot,
bottommost_level_);
Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
prev_snapshot, bottommost_level_);
merge_out_iter_.SeekToFirst();

if (merge_out_iter_.Valid()) {
if (!s.ok() && !s.IsMergeInProgress()) {
status_ = s;
return;
} else if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
Expand Down Expand Up @@ -481,6 +485,10 @@ void CompactionIterator::NextFromInput() {
input_->Seek(skip_until);
}
}

if (!valid_ && IsShuttingDown()) {
status_ = Status::ShutdownInProgress();
}
}

void CompactionIterator::PrepareOutput() {
Expand Down
12 changes: 8 additions & 4 deletions db/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "rocksdb/compaction_filter.h"
#include "util/log_buffer.h"

namespace rocksdb {

Expand Down Expand Up @@ -61,7 +60,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
const std::atomic<bool>* shutting_down = nullptr);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
Expand All @@ -72,7 +71,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
const std::atomic<bool>* shutting_down = nullptr);

~CompactionIterator();

Expand Down Expand Up @@ -125,7 +124,7 @@ class CompactionIterator {
RangeDelAggregator* range_del_agg_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;
const std::atomic<bool>* shutting_down_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
Expand Down Expand Up @@ -180,5 +179,10 @@ class CompactionIterator {
// is in or beyond the last file checked during the previous call
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;

bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
}
};
} // namespace rocksdb
165 changes: 140 additions & 25 deletions db/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,64 @@

namespace rocksdb {

// Expects no merging attempts.
class NoMergingMergeOp : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
ADD_FAILURE();
return false;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
ADD_FAILURE();
return false;
}
const char* Name() const override {
return "CompactionIteratorTest NoMergingMergeOp";
}
};

// Compaction filter that gets stuck when it sees a particular key,
// then gets unstuck when told to.
// Always returns Decition::kRemove.
class StallingFilter : public CompactionFilter {
public:
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override {
int k = std::atoi(key.ToString().c_str());
last_seen.store(k);
while (k >= stall_at.load()) {
std::this_thread::yield();
}
return Decision::kRemove;
}

const char* Name() const override {
return "CompactionIteratorTest StallingFilter";
}

// Wait until the filter sees a key >= k and stalls at that key.
// If `exact`, asserts that the seen key is equal to k.
void WaitForStall(int k, bool exact = true) {
stall_at.store(k);
while (last_seen.load() < k) {
std::this_thread::yield();
}
if (exact) {
EXPECT_EQ(k, last_seen.load());
}
}

// Filter will stall on key >= stall_at. Advance stall_at to unstall.
mutable std::atomic<int> stall_at{0};
// Last key the filter was called with.
mutable std::atomic<int> last_seen{0};
};

class LoggingForwardVectorIterator : public InternalIterator {
public:
struct Action {
Expand Down Expand Up @@ -88,13 +146,15 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
virtual int level(size_t compaction_input_level) const { return 0; }
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
return false;
return key_not_exists_beyond_output_level;
}
virtual bool bottommost_level() const { return false; }
virtual int number_levels() const { return 1; }
virtual Slice GetLargestUserKey() const {
return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
}

bool key_not_exists_beyond_output_level = false;
};

class CompactionIteratorTest : public testing::Test {
Expand All @@ -116,17 +176,19 @@ class CompactionIteratorTest : public testing::Test {

std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
if (filter) {
compaction.reset(new FakeCompaction());
compaction_proxy_ = new FakeCompaction();
compaction.reset(compaction_proxy_);
}

merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, 0U, false, 0));
nullptr, 0U, false, 0, 0, nullptr,
&shutting_down_));
iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
std::move(compaction), filter));
std::move(compaction), filter, &shutting_down_));
}

void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
Expand All @@ -138,6 +200,8 @@ class CompactionIteratorTest : public testing::Test {
std::unique_ptr<LoggingForwardVectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
std::unique_ptr<RangeDelAggregator> range_del_agg_;
std::atomic<bool> shutting_down_{false};
FakeCompaction* compaction_proxy_;
};

// It is possible that the output of the compaction iterator is empty even if
Expand Down Expand Up @@ -209,26 +273,6 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) {
}

TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
// Expect no merging attempts.
class MergeOp : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
ADD_FAILURE();
return false;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
ADD_FAILURE();
return false;
}
const char* Name() const override {
return "CompactionIteratorTest.CompactionFilterSkipUntil::MergeOp";
}
};

class Filter : public CompactionFilter {
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value,
Expand Down Expand Up @@ -286,7 +330,7 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
}
};

MergeOp merge_op;
NoMergingMergeOp merge_op;
Filter filter;
InitIterators(
{test::KeyStr("a", 50, kTypeValue), // keep
Expand Down Expand Up @@ -338,6 +382,77 @@ TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
ASSERT_EQ(expected_actions, iter_->log);
}

TEST_F(CompactionIteratorTest, ShuttingDownInFilter) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter);
// Don't leave tombstones (kTypeDeletion) for filtered keys.
compaction_proxy_->key_not_exists_beyond_output_level = true;

std::atomic<bool> seek_done{false};
std::thread compaction_thread([&] {
c_iter_->SeekToFirst();
EXPECT_FALSE(c_iter_->Valid());
EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
seek_done.store(true);
});

// Let key 1 through.
filter.WaitForStall(1);

// Shutdown during compaction filter call for key 2.
filter.WaitForStall(2);
shutting_down_.store(true);
EXPECT_FALSE(seek_done.load());

// Unstall filter and wait for SeekToFirst() to return.
filter.stall_at.store(3);
compaction_thread.join();
assert(seek_done.load());

// Check that filter was never called again.
EXPECT_EQ(2, filter.last_seen.load());
}

// Same as ShuttingDownInFilter, but shutdown happens during filter call for
// a merge operand, not for a value.
TEST_F(CompactionIteratorTest, ShuttingDownInMerge) {
NoMergingMergeOp merge_op;
StallingFilter filter;
InitIterators(
{test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
{"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, &merge_op, &filter);
compaction_proxy_->key_not_exists_beyond_output_level = true;

std::atomic<bool> seek_done{false};
std::thread compaction_thread([&] {
c_iter_->SeekToFirst();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
seek_done.store(true);
});

// Let key 1 through.
filter.WaitForStall(1);

// Shutdown during compaction filter call for key 2.
filter.WaitForStall(2);
shutting_down_.store(true);
EXPECT_FALSE(seek_done.load());

// Unstall filter and wait for SeekToFirst() to return.
filter.stall_at.store(3);
compaction_thread.join();
assert(seek_done.load());

// Check that filter was never called again.
EXPECT_EQ(2, filter.last_seen.load());
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit d18dd2c

Please sign in to comment.