Skip to content

Commit

Permalink
Allow users to stop manual compactions (facebook#3971)
Browse files Browse the repository at this point in the history
Summary:
Manual compaction may bring in very high load because sometime the amount of data involved in a compaction could be large, which may affect online service. So it would be good if the running compaction making the server busy can be stopped immediately. In this implementation, stopping manual compaction condition is only checked in slow process. We let deletion compaction and trivial move go through.
Pull Request resolved: facebook#3971

Test Plan: add tests at more spots.

Differential Revision: D17369043

fbshipit-source-id: 575a624fb992ce0bb07d9443eb209e547740043c
  • Loading branch information
qingping209 authored and facebook-github-bot committed Sep 17, 2019
1 parent f5a59c4 commit 6226830
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 14 deletions.
17 changes: 13 additions & 4 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
SnapshotListFetchCallback* snap_list_callback)
SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum,
snap_list_callback) {}
snap_list_callback,
manual_compaction_paused) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
Expand All @@ -59,7 +61,8 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
SnapshotListFetchCallback* snap_list_callback)
SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
Expand All @@ -73,6 +76,7 @@ CompactionIterator::CompactionIterator(
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
Expand Down Expand Up @@ -234,7 +238,8 @@ void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;

while (!valid_ && input_->Valid() && !IsShuttingDown()) {
while (!valid_ && input_->Valid() && !IsPausingManualCompaction() &&
!IsShuttingDown()) {
key_ = input_->key();
value_ = input_->value();
iter_stats_.num_input_records++;
Expand Down Expand Up @@ -612,6 +617,10 @@ void CompactionIterator::NextFromInput() {
if (!valid_ && IsShuttingDown()) {
status_ = Status::ShutdownInProgress();
}

if (IsPausingManualCompaction()) {
status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
}

void CompactionIterator::PrepareOutput() {
Expand Down
13 changes: 11 additions & 2 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
SnapshotListFetchCallback* snap_list_callback = nullptr);
SnapshotListFetchCallback* snap_list_callback = nullptr,
const std::atomic<bool>* manual_compaction_paused = nullptr);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
Expand All @@ -132,7 +133,8 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
SnapshotListFetchCallback* snap_list_callback = nullptr);
SnapshotListFetchCallback* snap_list_callback = nullptr,
const std::atomic<bool>* manual_compaction_paused = nullptr);

~CompactionIterator();

Expand Down Expand Up @@ -213,6 +215,7 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_;
bool valid_ = false;
Expand Down Expand Up @@ -279,5 +282,11 @@ class CompactionIterator {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
}

bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed);
}
};
} // namespace rocksdb
23 changes: 20 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ CompactionJob::CompactionJob(
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback)
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
Expand All @@ -324,6 +325,7 @@ CompactionJob::CompactionJob(
env_->OptimizeForCompactionTableRead(env_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
log_buffer_(log_buffer),
db_directory_(db_directory),
Expand Down Expand Up @@ -867,9 +869,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get(), shutting_down_);
db_options_.statistics.get());

TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void *>(
const_cast<std::atomic<bool> *>(manual_compaction_paused_)));

Slice* start = sub_compact->start;
Slice* end = sub_compact->end;
Expand All @@ -889,7 +894,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&range_del_agg, sub_compact->compaction, compaction_filter,
shutting_down_, preserve_deletes_seqnum_,
// Currently range_del_agg is incompatible with snapshot refresh feature.
range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr));
range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr,
manual_compaction_paused_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
Expand Down Expand Up @@ -953,7 +959,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
input_status = input->status();
output_file_ended = true;
}
TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void *>(
const_cast<std::atomic<bool> *>(manual_compaction_paused_)));
c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) {
break;
}
if (!output_file_ended && c_iter->Valid() &&
sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(c_iter->key(),
Expand Down Expand Up @@ -1006,6 +1018,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
shutting_down_->load(std::memory_order_relaxed)) {
status = Status::ShutdownInProgress("Database shutdown");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
(manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
status = input->status();
}
Expand Down
4 changes: 3 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class CompactionJob {
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback);
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused = nullptr);

~CompactionJob();

Expand Down Expand Up @@ -154,6 +155,7 @@ class CompactionJob {
EnvOptions env_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_;
Directory* db_directory_;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
batch_per_txn_(batch_per_txn),
db_lock_(nullptr),
shutting_down_(false),
manual_compaction_paused_(false),
bg_cv_(&mutex_),
logfile_number_(0),
log_dir_synced_(false),
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ class DBImpl : public DB {
virtual Status EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) override;

virtual void EnableManualCompaction() override;
virtual void DisableManualCompaction() override;

using DB::SetOptions;
Status SetOptions(
ColumnFamilyHandle* column_family,
Expand Down Expand Up @@ -1638,6 +1641,7 @@ class DBImpl : public DB {
InstrumentedMutex log_write_mutex_;

std::atomic<bool> shutting_down_;
std::atomic<bool> manual_compaction_paused_;
// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
// * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
Expand Down
46 changes: 42 additions & 4 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,9 @@ Status DBImpl::CompactFilesImpl(
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (manual_compaction_paused_.load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}

std::unordered_set<uint64_t> input_set;
for (const auto& file_name : input_file_names) {
Expand Down Expand Up @@ -1012,7 +1015,8 @@ Status DBImpl::CompactFilesImpl(
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
: nullptr,
&manual_compaction_paused_);

// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
Expand Down Expand Up @@ -1058,6 +1062,12 @@ Status DBImpl::CompactFilesImpl(
// Done
} else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
} else if (status.IsManualCompactionPaused()) {
// Don't report stopping manual compaction as error
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] [JOB %d] Stopping manual compaction",
c->column_family_data()->GetName().c_str(),
job_context->job_id);
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Compaction error: %s",
Expand Down Expand Up @@ -1128,6 +1138,10 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
return;
}
Version* current = cfd->current();
current->Ref();
// release lock while notifying events
Expand Down Expand Up @@ -1190,6 +1204,10 @@ void DBImpl::NotifyOnCompactionCompleted(
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
return;
}
Version* current = cfd->current();
current->Ref();
// release lock while notifying events
Expand Down Expand Up @@ -1879,6 +1897,14 @@ Status DBImpl::EnableAutoCompaction(
return s;
}

void DBImpl::DisableManualCompaction() {
manual_compaction_paused_.store(true, std::memory_order_release);
}

void DBImpl::EnableManualCompaction() {
manual_compaction_paused_.store(false, std::memory_order_release);
}

void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
if (!opened_successfully_) {
Expand Down Expand Up @@ -2319,6 +2345,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
env_->SleepForMicroseconds(10000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() &&
!s.IsColumnFamilyDropped()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
Expand All @@ -2336,6 +2363,12 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
LogFlush(immutable_db_options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
ManualCompactionState *m = prepicked_compaction->manual_compaction_state;
assert(m);
ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
m->cfd->GetName().c_str(),
job_context.job_id);
}

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
Expand All @@ -2344,7 +2377,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
// have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
!s.IsManualCompactionPaused() &&
!s.IsColumnFamilyDropped());
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");

// delete unnecessary files if any, this is done outside the mutex
Expand Down Expand Up @@ -2427,6 +2461,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (is_manual &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {
status = error_handler_.GetBGError();
Expand Down Expand Up @@ -2744,7 +2781,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr);
: nullptr, is_manual ? &manual_compaction_paused_ : nullptr);
compaction_job.Prepare();

NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
Expand Down Expand Up @@ -2784,7 +2821,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job_stats, job_context->job_id);
}

if (status.ok() || status.IsCompactionTooLarge()) {
if (status.ok() || status.IsCompactionTooLarge() ||
status.IsManualCompactionPaused()) {
// Done
} else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
Expand Down
4 changes: 4 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,10 @@ class ModelDB : public DB {
return Status::NotSupported("Not supported operation.");
}

void EnableManualCompaction() override { return; }

void DisableManualCompaction() override { return; }

using DB::NumberLevels;
int NumberLevels(ColumnFamilyHandle* /*column_family*/) override { return 1; }

Expand Down
Loading

0 comments on commit 6226830

Please sign in to comment.