From e062a719cc8c7c9aa19c8c58131784b3acaec859 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 2 Dec 2020 09:29:50 -0800 Subject: [PATCH] Fix assertion failure in bg flush (#7362) Summary: https://github.com/facebook/rocksdb/issues/7340 reports and reproduces an assertion failure caused by a combination of the following: - atomic flush is disabled. - a column family can appear multiple times in the flush queue at the same time. This behavior was introduced in release 5.17. Consequently, it is possible that two flushes race with each other. One bg flush thread flushes all memtables. The other thread calls `FlushMemTableToOutputFile()` afterwards, and hits the assertion error below. ``` assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); ``` Fix this by reverting the behavior. In non-atomic-flush case, a column family can appear in the flush queue at most once at the same time. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7362 Test Plan: make check Also run stress test successfully for 10 times. ``` make crash_test ``` Reviewed By: ajkr Differential Revision: D25172996 Pulled By: riversand963 fbshipit-source-id: f1559b6366cc609e961e3fc83fae548f1fad08ce --- db/db_impl/db_impl_compaction_flush.cc | 123 ++++++++++++++++--------- db/db_impl/db_impl_write.cc | 38 ++++++-- db/db_impl/db_secondary_test.cc | 10 +- db/db_wal_test.cc | 6 +- db/flush_job.cc | 34 +++---- db/flush_job.h | 14 ++- db/flush_job_test.cc | 18 ++-- db/memtable_list.cc | 4 +- db/memtable_list.h | 2 +- db/memtable_list_test.cc | 32 +++---- 10 files changed, 169 insertions(+), 112 deletions(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 518cabf1155..90e6be6fbc0 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -159,10 +159,10 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, - nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(), - &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), - GetDataDir(cfd, 0U), + port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, + versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker, job_context, + log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, @@ -313,30 +313,22 @@ Status DBImpl::FlushMemTablesToOutputFiles( return AtomicFlushMemTablesToOutputFiles( bg_flush_args, made_progress, job_context, log_buffer, thread_pri); } + assert(bg_flush_args.size() == 1); std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; SnapshotChecker* snapshot_checker; GetSnapshotContext(job_context, &snapshot_seqs, &earliest_write_conflict_snapshot, &snapshot_checker); - Status status; - for (auto& arg : bg_flush_args) { - ColumnFamilyData* cfd = arg.cfd_; - MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); - SuperVersionContext* superversion_context = arg.superversion_context_; - Status s = FlushMemTableToOutputFile( - cfd, mutable_cf_options, made_progress, job_context, - superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, log_buffer, thread_pri); - if (!s.ok()) { - status = s; - if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { - // At this point, DB is not shutting down, nor is cfd dropped. - // Something is wrong, thus we break out of the loop. - break; - } - } - } - return status; + const auto& bg_flush_arg = bg_flush_args[0]; + ColumnFamilyData* cfd = bg_flush_arg.cfd_; + MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + SuperVersionContext* superversion_context = + bg_flush_arg.superversion_context_; + Status s = FlushMemTableToOutputFile( + cfd, mutable_cf_options, made_progress, job_context, superversion_context, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + log_buffer, thread_pri); + return s; } /* @@ -399,7 +391,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); - const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); + uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_; jobs.emplace_back(new FlushJob( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, @@ -1697,8 +1689,9 @@ void DBImpl::GenerateFlushRequest(const autovector& cfds, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, bool writes_stopped) { + // This method should not be called if atomic_flush is true. + assert(!immutable_db_options_.atomic_flush); Status s; - uint64_t flush_memtable_id = 0; if (!flush_options.allow_write_stall) { bool flush_needed = true; s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); @@ -1708,7 +1701,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } - FlushRequest flush_req; + autovector flush_reqs; + autovector memtable_ids_to_wait; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -1730,11 +1724,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, assert(cfd->imm()->NumNotFlushed() > 0); } } + const uint64_t flush_memtable_id = port::kMaxUint64; if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { - flush_memtable_id = cfd->imm()->GetLatestMemTableID(); - flush_req.emplace_back(cfd, flush_memtable_id); + FlushRequest req{{cfd, flush_memtable_id}}; + flush_reqs.emplace_back(std::move(req)); + memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID()); } if (immutable_db_options_.persist_stats_to_disk && flush_reason != FlushReason::kErrorRecoveryRetryFlush) { @@ -1760,15 +1756,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, "to avoid holding old logs", cfd->GetName().c_str()); s = SwitchMemtable(cfd_stats, &context); - flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID(); - flush_req.emplace_back(cfd_stats, flush_memtable_id); + FlushRequest req{{cfd_stats, flush_memtable_id}}; + flush_reqs.emplace_back(std::move(req)); + memtable_ids_to_wait.emplace_back( + cfd->imm()->GetLatestMemTableID()); } } } } - if (s.ok() && !flush_req.empty()) { - for (auto& elem : flush_req) { - ColumnFamilyData* loop_cfd = elem.first; + + if (s.ok() && !flush_reqs.empty()) { + for (const auto& req : flush_reqs) { + assert(req.size() == 1); + ColumnFamilyData* loop_cfd = req[0].first; loop_cfd->imm()->FlushRequested(); } // If the caller wants to wait for this flush to complete, it indicates @@ -1776,12 +1776,15 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // other threads which may drop the column family concurrently. // Therefore, we increase the cfd's ref count. if (flush_options.wait) { - for (auto& elem : flush_req) { - ColumnFamilyData* loop_cfd = elem.first; + for (const auto& req : flush_reqs) { + assert(req.size() == 1); + ColumnFamilyData* loop_cfd = req[0].first; loop_cfd->Ref(); } } - SchedulePendingFlush(flush_req, flush_reason); + for (const auto& req : flush_reqs) { + SchedulePendingFlush(req, flush_reason); + } MaybeScheduleFlushOrCompaction(); } @@ -1797,9 +1800,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && flush_options.wait) { autovector cfds; autovector flush_memtable_ids; - for (auto& iter : flush_req) { - cfds.push_back(iter.first); - flush_memtable_ids.push_back(&(iter.second)); + assert(flush_reqs.size() == memtable_ids_to_wait.size()); + for (size_t i = 0; i < flush_reqs.size(); ++i) { + assert(flush_reqs[i].size() == 1); + cfds.push_back(flush_reqs[i][0].first); + flush_memtable_ids.push_back(&(memtable_ids_to_wait[i])); } s = WaitForFlushMemTables( cfds, flush_memtable_ids, @@ -2224,6 +2229,17 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { assert(!flush_queue_.empty()); FlushRequest flush_req = flush_queue_.front(); flush_queue_.pop_front(); + if (!immutable_db_options_.atomic_flush) { + assert(flush_req.size() == 1); + } + for (const auto& elem : flush_req) { + if (!immutable_db_options_.atomic_flush) { + ColumnFamilyData* cfd = elem.first; + assert(cfd); + assert(cfd->queued_for_flush()); + cfd->set_queued_for_flush(false); + } + } // TODO: need to unset flush reason? return flush_req; } @@ -2256,19 +2272,36 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue( void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, FlushReason flush_reason) { + mutex_.AssertHeld(); if (flush_req.empty()) { return; } - for (auto& iter : flush_req) { - ColumnFamilyData* cfd = iter.first; - cfd->Ref(); - cfd->SetFlushReason(flush_reason); + if (!immutable_db_options_.atomic_flush) { + // For the non-atomic flush case, we never schedule multiple column + // families in the same flush request. + assert(flush_req.size() == 1); + ColumnFamilyData* cfd = flush_req[0].first; + assert(cfd); + if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { + cfd->Ref(); + cfd->set_queued_for_flush(true); + cfd->SetFlushReason(flush_reason); + ++unscheduled_flushes_; + flush_queue_.push_back(flush_req); + } + } else { + for (auto& iter : flush_req) { + ColumnFamilyData* cfd = iter.first; + cfd->Ref(); + cfd->SetFlushReason(flush_reason); + } + ++unscheduled_flushes_; + flush_queue_.push_back(flush_req); } - ++unscheduled_flushes_; - flush_queue_.push_back(flush_req); } void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { + mutex_.AssertHeld(); if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { AddToCompactionQueue(cfd); ++unscheduled_compactions_; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 1cab2b6c050..02ef5671e98 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1335,10 +1335,17 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { } for (auto cfd : cfds) { cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); MaybeScheduleFlushOrCompaction(); } return status; @@ -1414,10 +1421,17 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } for (const auto cfd : cfds) { cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } return status; @@ -1641,10 +1655,16 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } else { + for (auto* cfd : cfds) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } return status; diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index bad0153a665..281cf76be68 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -748,10 +748,12 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { } }; for (int k = 0; k != 8; ++k) { - ASSERT_OK( - Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); - ASSERT_OK( - Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + for (int j = 0; j < 2; ++j) { + ASSERT_OK(Put(0 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + ASSERT_OK(Put(1 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + } TEST_SYNC_POINT( "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f5f97dc366c..eb7481ab676 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1646,7 +1646,7 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override { count++; - assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason); + ASSERT_EQ(FlushReason::kWriteBufferManager, flush_job_info.flush_reason); } }; std::shared_ptr test_listener = @@ -1690,7 +1690,9 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { 1 * kMB); // Write one more key to trigger flush. ASSERT_OK(Put(0, "foo", "v2")); - dbfull()->TEST_WaitForFlushMemTable(); + for (auto* h : handles_) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h)); + } // Flushed two column families. ASSERT_EQ(2, test_listener->count.load()); } diff --git a/db/flush_job.cc b/db/flush_job.cc index 6b943a5670b..d596dc06b01 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -80,22 +80,24 @@ const char* GetFlushReasonString (FlushReason flush_reason) { } } -FlushJob::FlushJob( - const std::string& dbname, ColumnFamilyData* cfd, - const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id, - const FileOptions& file_options, VersionSet* versions, - InstrumentedMutex* db_mutex, std::atomic* shutting_down, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_file_directory, CompressionType output_compression, - Statistics* stats, EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest, - Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low) +FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, + const MutableCFOptions& mutable_cf_options, + uint64_t max_memtable_id, const FileOptions& file_options, + VersionSet* versions, InstrumentedMutex* db_mutex, + std::atomic* shutting_down, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, + CompressionType output_compression, Statistics* stats, + EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri, + const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), diff --git a/db/flush_job.h b/db/flush_job.h index 785cfc9bc6b..e3623209fcf 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -60,10 +60,9 @@ class FlushJob { // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, - const uint64_t* max_memtable_id, const FileOptions& file_options, - VersionSet* versions, InstrumentedMutex* db_mutex, - std::atomic* shutting_down, + const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id, + const FileOptions& file_options, VersionSet* versions, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, @@ -110,12 +109,11 @@ class FlushJob { ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; const MutableCFOptions& mutable_cf_options_; - // Pointer to a variable storing the largest memtable id to flush in this + // A variable storing the largest memtable id to flush in this // flush job. RocksDB uses this variable to select the memtables to flush in // this job. All memtables in this column family with an ID smaller than or - // equal to *max_memtable_id_ will be selected for flush. If null, then all - // memtables in the column family will be selected. - const uint64_t* max_memtable_id_; + // equal to max_memtable_id_ will be selected for flush. + uint64_t max_memtable_id_; const FileOptions file_options_; VersionSet* versions_; InstrumentedMutex* db_mutex_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 6ac6a2e8051..2ac569f7743 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -158,7 +158,7 @@ TEST_F(FlushJobTest, Empty) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false, @@ -240,7 +240,7 @@ TEST_F(FlushJobTest, NonEmpty) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, @@ -302,7 +302,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, @@ -374,7 +374,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { std::vector snapshot_seqs; flush_jobs.emplace_back(new FlushJob( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - &memtable_ids[k], env_options_, versions_.get(), &mutex_, + memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, @@ -491,7 +491,7 @@ TEST_F(FlushJobTest, Snapshots) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, @@ -558,8 +558,8 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) { PutFixed64(&full_history_ts_low, std::numeric_limits::max()); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, - &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, @@ -609,8 +609,8 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { PutFixed64(&full_history_ts_low, 0); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, - &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f2974ec04e7..ffb4d75028c 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -334,7 +334,7 @@ bool MemTableList::IsFlushPending() const { } // Returns the memtables that need to be flushed. -void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, +void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, autovector* ret) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); @@ -345,7 +345,7 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { atomic_flush = true; } - if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { + if (m->GetID() > max_memtable_id) { break; } if (!m->flush_in_progress_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index 0af7a0cea41..62e03cf5366 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -251,7 +251,7 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. - void PickMemtablesToFlush(const uint64_t* max_memtable_id, + void PickMemtablesToFlush(uint64_t max_memtable_id, autovector* mems); // Reset status of the given memtable list back to pending state so that diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 6f578e7c71c..e3b7eb621ba 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -199,7 +199,7 @@ TEST_F(MemTableListTest, Empty) { ASSERT_FALSE(list.IsFlushPending()); autovector mems; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &mems); ASSERT_EQ(0, mems.size()); autovector to_delete; @@ -399,7 +399,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Flush this memtable from the list. // (It will then be a part of the memtable history). autovector to_flush; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); MutableCFOptions mutable_cf_options(options); @@ -451,7 +451,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { ASSERT_EQ(0, to_delete.size()); to_flush.clear(); - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); // Flush second memtable @@ -567,7 +567,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); autovector to_flush; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); // Request a flush even though there is nothing to flush @@ -576,7 +576,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Attempt to 'flush' to clear request for flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -600,7 +600,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(2, to_flush.size()); ASSERT_EQ(2, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -621,7 +621,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(3, to_flush.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -629,7 +629,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush2; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush2); ASSERT_EQ(0, to_flush2.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -647,7 +647,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush again - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush2); ASSERT_EQ(1, to_flush2.size()); ASSERT_EQ(4, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -668,7 +668,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); // Should pick 4 of 5 since 1 table has been picked in to_flush2 ASSERT_EQ(4, to_flush.size()); ASSERT_EQ(5, list.NumNotFlushed()); @@ -677,7 +677,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush3; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush3); ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed ASSERT_EQ(5, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -738,7 +738,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { autovector to_flush4; list.FlushRequested(); ASSERT_TRUE(list.HasFlushRequested()); - list.PickMemtablesToFlush(&memtable_id, &to_flush4); + list.PickMemtablesToFlush(memtable_id, &to_flush4); ASSERT_TRUE(to_flush4.empty()); ASSERT_EQ(1, list.NumNotFlushed()); ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -749,7 +749,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // equal to 5. Therefore, only tables[5] will be selected. memtable_id = 5; list.FlushRequested(); - list.PickMemtablesToFlush(&memtable_id, &to_flush4); + list.PickMemtablesToFlush(memtable_id, &to_flush4); ASSERT_EQ(1, static_cast(to_flush4.size())); ASSERT_EQ(1, list.NumNotFlushed()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -841,7 +841,8 @@ TEST_F(MemTableListTest, AtomicFlusTest) { auto* list = lists[i]; ASSERT_FALSE(list->IsFlushPending()); ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); - list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]); + list->PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, + &flush_candidates[i]); ASSERT_EQ(0, flush_candidates[i].size()); } // Request flush even though there is nothing to flush @@ -871,8 +872,7 @@ TEST_F(MemTableListTest, AtomicFlusTest) { // Pick memtables to flush for (auto i = 0; i != num_cfs; ++i) { flush_candidates[i].clear(); - lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], - &flush_candidates[i]); + lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]); ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, static_cast(flush_candidates[i].size())); }