Skip to content

Commit

Permalink
Support concurrent CF iteration and drop (facebook#6147)
Browse files Browse the repository at this point in the history
Summary:
It's easy to cause coredump when closing ColumnFamilyHandle with unreleased iterators, especially iterators release is controlled by java GC when using JNI.

This patch fixed concurrent CF iteration and drop, we let iterators(actually SuperVersion) hold a ColumnFamilyData reference to prevent the CF from being released too early.

fixed facebook#5982
Pull Request resolved: facebook#6147

Differential Revision: D18926378

fbshipit-source-id: 1dff6d068c603d012b81446812368bfee95a5e15
  • Loading branch information
javeme authored and facebook-github-bot committed Dec 13, 2019
1 parent 4b74035 commit c2029f9
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 90 deletions.
70 changes: 43 additions & 27 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
JobContext job_context(0);
mutex_->Lock();
if (cfd_->Unref()) {
bool dropped = cfd_->IsDropped();

delete cfd_;

bool dropped = cfd_->IsDropped();
if (cfd_->UnrefAndTryDelete()) {
if (dropped) {
db_->FindObsoleteFiles(&job_context, false, true);
}
Expand Down Expand Up @@ -439,13 +436,18 @@ void SuperVersion::Cleanup() {
to_delete.push_back(m);
}
current->Unref();
if (cfd->Unref()) {
delete cfd;
}
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current) {
cfd = new_cfd;
mem = new_mem;
imm = new_imm;
current = new_current;
cfd->Ref();
mem->Ref();
imm->Ref();
current->Ref();
Expand Down Expand Up @@ -581,21 +583,7 @@ ColumnFamilyData::~ColumnFamilyData() {
// compaction_queue_ and we destroyed it
assert(!queued_for_flush_);
assert(!queued_for_compaction_);

if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();

bool is_last_reference __attribute__((__unused__));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
assert(super_version_ == nullptr);

if (dummy_versions_ != nullptr) {
// List must be empty
Expand All @@ -615,6 +603,36 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}

bool ColumnFamilyData::UnrefAndTryDelete() {
int old_refs = refs_.fetch_sub(1);
assert(old_refs > 0);

if (old_refs == 1) {
assert(super_version_ == nullptr);
delete this;
return true;
}

if (old_refs == 2 && super_version_ != nullptr) {
// Only the super_version_ holds me
SuperVersion* sv = super_version_;
super_version_ = nullptr;
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
sv->db_mutex->Unlock();
local_sv_.reset();
sv->db_mutex->Lock();

if (sv->Unref()) {
// May delete this ColumnFamilyData after calling Cleanup()
sv->Cleanup();
delete sv;
return true;
}
}
return false;
}

void ColumnFamilyData::SetDropped() {
// can't drop default CF
assert(id_ != 0);
Expand Down Expand Up @@ -1169,7 +1187,7 @@ void ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
new_superversion->Init(this, mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
Expand Down Expand Up @@ -1344,14 +1362,12 @@ ColumnFamilySet::~ColumnFamilySet() {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
bool last_ref __attribute__((__unused__));
last_ref = cfd->Unref();
last_ref = cfd->UnrefAndTryDelete();
assert(last_ref);
delete cfd;
}
bool dummy_last_ref __attribute__((__unused__));
dummy_last_ref = dummy_cfd_->Unref();
dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
assert(dummy_last_ref);
delete dummy_cfd_;
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
Expand Down
10 changes: 8 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
struct SuperVersion {
// Accessing members of this class is not thread-safe and requires external
// synchronization (ie db mutex held or on write thread).
ColumnFamilyData* cfd;
MemTable* mem;
MemTableListVersion* imm;
Version* current;
Expand All @@ -221,8 +222,8 @@ struct SuperVersion {
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
void Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);

// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
Expand Down Expand Up @@ -288,6 +289,11 @@ class ColumnFamilyData {
return old_refs == 1;
}

// UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only
// be called while holding a DB mutex, or during single-threaded recovery.
bool UnrefAndTryDelete();

// SetDropped() can only be called under following conditions:
// 1) Holding a DB mutex,
// 2) from single-threaded write thread, AND
Expand Down
4 changes: 1 addition & 3 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ Compaction::~Compaction() {
input_version_->Unref();
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
cfd_->UnrefAndTryDelete();
}
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down
15 changes: 5 additions & 10 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ Status DBImpl::ResumeImpl() {
mutex_.Unlock();
s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -418,7 +418,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
}
}
}
Expand Down Expand Up @@ -475,17 +475,12 @@ Status DBImpl::CloseHelper() {
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (cfd->Unref()) {
delete cfd;
}
iter.first->UnrefAndTryDelete();
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}

if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
Expand Down Expand Up @@ -4303,7 +4298,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
}
}
for (auto cfd : cfd_list) {
cfd->Unref();
cfd->UnrefAndTryDelete();
}
}
return s;
Expand Down
26 changes: 8 additions & 18 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1618,12 +1618,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
if (tmp_cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete tmp_cfd;
}
tmp_cfd->UnrefAndTryDelete();
}
}
TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
Expand Down Expand Up @@ -1683,7 +1680,7 @@ Status DBImpl::AtomicFlushMemTables(
}
cfd->Ref();
s = SwitchMemtable(cfd, &context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -1723,12 +1720,9 @@ Status DBImpl::AtomicFlushMemTables(
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* cfd : cfds) {
if (cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}
return s;
Expand Down Expand Up @@ -2209,16 +2203,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
arg.cfd_ = nullptr;
}
}
}
for (auto cfd : column_families_not_to_flush) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
return status;
}
Expand Down Expand Up @@ -2547,10 +2538,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// reference).
// This will all happen under a mutex so we don't have to be afraid of
// somebody else deleting it.
if (cfd->Unref()) {
if (cfd->UnrefAndTryDelete()) {
// This was the last reference of the column family, so no need to
// compact.
delete cfd;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
ColumnFamilyData* cfd;

while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfd->Unref();
cfd->UnrefAndTryDelete();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
Expand Down
10 changes: 4 additions & 6 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1335,7 +1335,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1525,8 +1525,7 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);

if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
}
Expand Down Expand Up @@ -1558,8 +1557,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
if (!cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context);
}
if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
if (!status.ok()) {
Expand Down
8 changes: 2 additions & 6 deletions db/flush_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
}

// no longer relevant, retry
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}

Expand All @@ -80,9 +78,7 @@ bool FlushScheduler::Empty() {
void FlushScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
assert(head_.load(std::memory_order_relaxed) == nullptr);
}
Expand Down
9 changes: 2 additions & 7 deletions db/trim_history_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() {
// success
return cfd;
}
if (cfd->Unref()) {
// no longer relevant, retry
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}

Expand All @@ -49,9 +46,7 @@ bool TrimHistoryScheduler::Empty() {
void TrimHistoryScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
assert(Empty());
}
Expand Down
Loading

0 comments on commit c2029f9

Please sign in to comment.