Skip to content

Commit

Permalink
Fix assertion failure in bg flush (facebook#7362)
Browse files Browse the repository at this point in the history
Summary:
facebook#7340 reports and reproduces an assertion failure caused by a combination of the following:
- atomic flush is disabled.
- a column family can appear multiple times in the flush queue at the same time. This behavior was introduced in release 5.17.

Consequently, it is possible that two flushes race with each other. One bg flush thread flushes all memtables. The other thread calls `FlushMemTableToOutputFile()` afterwards, and hits the assertion error below.

```
  assert(cfd->imm()->NumNotFlushed() != 0);
  assert(cfd->imm()->IsFlushPending());
```

Fix this by reverting the behavior. In non-atomic-flush case, a column family can appear in the flush queue at most once at the same time.

Pull Request resolved: facebook#7362

Test Plan:
make check
Also run stress test successfully for 10 times.
```
make crash_test
```

Reviewed By: ajkr

Differential Revision: D25172996

Pulled By: riversand963

fbshipit-source-id: f1559b6366cc609e961e3fc83fae548f1fad08ce
  • Loading branch information
riversand963 authored and facebook-github-bot committed Dec 2, 2020
1 parent 9e16404 commit e062a71
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 112 deletions.
123 changes: 78 additions & 45 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ Status DBImpl::FlushMemTableToOutputFile(

FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
&mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
GetDataDir(cfd, 0U),
port::kMaxUint64 /* memtable_id */, file_options_for_compaction_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker, job_context,
log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
Expand Down Expand Up @@ -313,30 +313,22 @@ Status DBImpl::FlushMemTablesToOutputFiles(
return AtomicFlushMemTablesToOutputFiles(
bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
}
assert(bg_flush_args.size() == 1);
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
Status status;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context = arg.superversion_context_;
Status s = FlushMemTableToOutputFile(
cfd, mutable_cf_options, made_progress, job_context,
superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, log_buffer, thread_pri);
if (!s.ok()) {
status = s;
if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
// At this point, DB is not shutting down, nor is cfd dropped.
// Something is wrong, thus we break out of the loop.
break;
}
}
}
return status;
const auto& bg_flush_arg = bg_flush_args[0];
ColumnFamilyData* cfd = bg_flush_arg.cfd_;
MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context =
bg_flush_arg.superversion_context_;
Status s = FlushMemTableToOutputFile(
cfd, mutable_cf_options, made_progress, job_context, superversion_context,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
log_buffer, thread_pri);
return s;
}

/*
Expand Down Expand Up @@ -399,7 +391,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
jobs.emplace_back(new FlushJob(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
Expand Down Expand Up @@ -1697,8 +1689,9 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason, bool writes_stopped) {
// This method should not be called if atomic_flush is true.
assert(!immutable_db_options_.atomic_flush);
Status s;
uint64_t flush_memtable_id = 0;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
Expand All @@ -1708,7 +1701,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}

FlushRequest flush_req;
autovector<FlushRequest> flush_reqs;
autovector<uint64_t> memtable_ids_to_wait;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
Expand All @@ -1730,11 +1724,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
assert(cfd->imm()->NumNotFlushed() > 0);
}
}
const uint64_t flush_memtable_id = port::kMaxUint64;
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd, flush_memtable_id);
FlushRequest req{{cfd, flush_memtable_id}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
}
if (immutable_db_options_.persist_stats_to_disk &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
Expand All @@ -1760,28 +1756,35 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
"to avoid holding old logs",
cfd->GetName().c_str());
s = SwitchMemtable(cfd_stats, &context);
flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd_stats, flush_memtable_id);
FlushRequest req{{cfd_stats, flush_memtable_id}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd->imm()->GetLatestMemTableID());
}
}
}
}
if (s.ok() && !flush_req.empty()) {
for (auto& elem : flush_req) {
ColumnFamilyData* loop_cfd = elem.first;

if (s.ok() && !flush_reqs.empty()) {
for (const auto& req : flush_reqs) {
assert(req.size() == 1);
ColumnFamilyData* loop_cfd = req[0].first;
loop_cfd->imm()->FlushRequested();
}
// If the caller wants to wait for this flush to complete, it indicates
// that the caller expects the ColumnFamilyData not to be free'ed by
// other threads which may drop the column family concurrently.
// Therefore, we increase the cfd's ref count.
if (flush_options.wait) {
for (auto& elem : flush_req) {
ColumnFamilyData* loop_cfd = elem.first;
for (const auto& req : flush_reqs) {
assert(req.size() == 1);
ColumnFamilyData* loop_cfd = req[0].first;
loop_cfd->Ref();
}
}
SchedulePendingFlush(flush_req, flush_reason);
for (const auto& req : flush_reqs) {
SchedulePendingFlush(req, flush_reason);
}
MaybeScheduleFlushOrCompaction();
}

Expand All @@ -1797,9 +1800,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (s.ok() && flush_options.wait) {
autovector<ColumnFamilyData*> cfds;
autovector<const uint64_t*> flush_memtable_ids;
for (auto& iter : flush_req) {
cfds.push_back(iter.first);
flush_memtable_ids.push_back(&(iter.second));
assert(flush_reqs.size() == memtable_ids_to_wait.size());
for (size_t i = 0; i < flush_reqs.size(); ++i) {
assert(flush_reqs[i].size() == 1);
cfds.push_back(flush_reqs[i][0].first);
flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
Expand Down Expand Up @@ -2224,6 +2229,17 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
FlushRequest flush_req = flush_queue_.front();
flush_queue_.pop_front();
if (!immutable_db_options_.atomic_flush) {
assert(flush_req.size() == 1);
}
for (const auto& elem : flush_req) {
if (!immutable_db_options_.atomic_flush) {
ColumnFamilyData* cfd = elem.first;
assert(cfd);
assert(cfd->queued_for_flush());
cfd->set_queued_for_flush(false);
}
}
// TODO: need to unset flush reason?
return flush_req;
}
Expand Down Expand Up @@ -2256,19 +2272,36 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(

void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
FlushReason flush_reason) {
mutex_.AssertHeld();
if (flush_req.empty()) {
return;
}
for (auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
assert(flush_req.size() == 1);
ColumnFamilyData* cfd = flush_req[0].first;
assert(cfd);
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref();
cfd->set_queued_for_flush(true);
cfd->SetFlushReason(flush_reason);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
} else {
for (auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
mutex_.AssertHeld();
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
Expand Down
38 changes: 29 additions & 9 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1335,10 +1335,17 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
}
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
MaybeScheduleFlushOrCompaction();
}
return status;
Expand Down Expand Up @@ -1414,10 +1421,17 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
for (const auto cfd : cfds) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
return status;
Expand Down Expand Up @@ -1641,10 +1655,16 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
if (status.ok()) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
}
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
return status;
Expand Down
10 changes: 6 additions & 4 deletions db/db_impl/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,12 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
}
};
for (int k = 0; k != 8; ++k) {
ASSERT_OK(
Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
ASSERT_OK(
Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(0 /*cf*/, "key" + std::to_string(k),
"value" + std::to_string(k)));
ASSERT_OK(Put(1 /*cf*/, "key" + std::to_string(k),
"value" + std::to_string(k)));
}
TEST_SYNC_POINT(
"DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
Expand Down
6 changes: 4 additions & 2 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {

void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
count++;
assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
ASSERT_EQ(FlushReason::kWriteBufferManager, flush_job_info.flush_reason);
}
};
std::shared_ptr<TestFlushListener> test_listener =
Expand Down Expand Up @@ -1690,7 +1690,9 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
1 * kMB);
// Write one more key to trigger flush.
ASSERT_OK(Put(0, "foo", "v2"));
dbfull()->TEST_WaitForFlushMemTable();
for (auto* h : handles_) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h));
}
// Flushed two column families.
ASSERT_EQ(2, test_listener->count.load());
}
Expand Down
34 changes: 18 additions & 16 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}

FlushJob::FlushJob(
const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low)
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
uint64_t max_memtable_id, const FileOptions& file_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
Expand Down
14 changes: 6 additions & 8 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ class FlushJob {
// IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const uint64_t* max_memtable_id, const FileOptions& file_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
Expand Down Expand Up @@ -110,12 +109,11 @@ class FlushJob {
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
// Pointer to a variable storing the largest memtable id to flush in this
// A variable storing the largest memtable id to flush in this
// flush job. RocksDB uses this variable to select the memtables to flush in
// this job. All memtables in this column family with an ID smaller than or
// equal to *max_memtable_id_ will be selected for flush. If null, then all
// memtables in the column family will be selected.
const uint64_t* max_memtable_id_;
// equal to max_memtable_id_ will be selected for flush.
uint64_t max_memtable_id_;
const FileOptions file_options_;
VersionSet* versions_;
InstrumentedMutex* db_mutex_;
Expand Down
Loading

0 comments on commit e062a71

Please sign in to comment.