Skip to content

Commit

Permalink
Speed up FindObsoleteFiles
Browse files Browse the repository at this point in the history
Summary:
Here's one solution we discussed on speeding up FindObsoleteFiles. Keep a set of all files in DBImpl and update the set every time we create a file. I probably missed few other spots where we create a file.

It might speed things up a bit, but makes code uglier. I don't really like it.

Much better approach would be to abstract all file handling to a separate class. Think of it as layer between DBImpl and Env. Having a separate class deal with file namings and deletion would benefit both code cleanliness (especially with huge DBImpl) and speed things up. It will take a huge effort to do this, though.

Let's discuss offline today.

Test Plan: Ran ./db_stress, verified that files are getting deleted

Reviewers: dhruba, haobo, kailiu, emayanke

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D13827
  • Loading branch information
igorcanadi committed Nov 8, 2013
1 parent dd218bb commit 1510339
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 124 deletions.
12 changes: 9 additions & 3 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ Status DBImpl::DisableFileDeletions() {
}

Status DBImpl::EnableFileDeletions() {
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = false;
Log(options_.info_log, "File Deletions Enabled");
DeletionState deletion_state;
{
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = false;
Log(options_.info_log, "File Deletions Enabled");
FindObsoleteFiles(deletion_state, true);
}
PurgeObsoleteFiles(deletion_state);
LogFlush(options_.info_log);
return Status::OK();
}

Expand Down
186 changes: 85 additions & 101 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,6 @@ struct DBImpl::CompactionState {
}
};

struct DBImpl::DeletionState {

// the list of all live files that cannot be deleted
std::vector<uint64_t> live;

// a list of all siles that exists in the db directory
std::vector<std::string> allfiles;

// the current filenumber, lognumber and prevlognumber
// that corresponds to the set of files in 'live'.
uint64_t filenumber, lognumber, prevlognumber;

// the list of all files to be evicted from the table cache
std::vector<uint64_t> files_to_evict;
};

// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
Expand Down Expand Up @@ -451,20 +435,25 @@ void DBImpl::MaybeDumpStats() {
}
}

// Returns the list of live files in 'live' and the list
// Returns the list of live files in 'sstlive' and the list
// of all files in the filesystem in 'allfiles'.
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, bool force) {
mutex_.AssertHeld();

// if deletion is disabled, do nothing
if (disable_delete_obsolete_files_) {
return;
}

// store the current filenum, lognum, etc
deletion_state.manifest_file_number = versions_->ManifestFileNumber();
deletion_state.log_number = versions_->LogNumber();
deletion_state.prev_log_number = versions_->PrevLogNumber();

// This method is costly when the number of files is large.
// Do not allow it to trigger more often than once in
// delete_obsolete_files_period_micros.
if (options_.delete_obsolete_files_period_micros != 0) {
if (!force && options_.delete_obsolete_files_period_micros != 0) {
const uint64_t now_micros = env_->NowMicros();
if (delete_obsolete_files_last_run_ +
options_.delete_obsolete_files_period_micros > now_micros) {
Expand All @@ -475,9 +464,9 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {

// Make a list of all of the live files; set is slow, should not
// be used.
deletion_state.live.assign(pending_outputs_.begin(),
pending_outputs_.end());
versions_->AddLiveFiles(&deletion_state.live);
deletion_state.sstlive.assign(pending_outputs_.begin(),
pending_outputs_.end());
versions_->AddLiveFiles(&deletion_state.sstlive);

// set of all files in the directory
env_->GetChildren(dbname_, &deletion_state.allfiles); // Ignore errors
Expand All @@ -492,59 +481,51 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
log_files.end()
);
}

// store the current filenum, lognum, etc
deletion_state.filenumber = versions_->ManifestFileNumber();
deletion_state.lognumber = versions_->LogNumber();
deletion_state.prevlognumber = versions_->PrevLogNumber();
}

Status DBImpl::DeleteLogFile(uint64_t number) {
Status s;
auto filename = LogFileName(options_.wal_dir, number);
if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
s = env_->RenameFile(filename,
ArchivedLogFileName(options_.wal_dir, number));

if (!s.ok()) {
Log(options_.info_log, "RenameFile logfile #%lu FAILED", number);
}
} else {
s = env_->DeleteFile(filename);
if(!s.ok()) {
Log(options_.info_log, "Delete logfile #%lu FAILED", number);
}
}

return s;
}

// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. If the removed file
// is a sst file, then it returns the file number in files_to_evict.
// belong to live files are posibly removed. Also, removes all the
// files in sstdeletefiles and logdeletefiles.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// if deletion is disabled, do nothing
if (disable_delete_obsolete_files_) {
return;
}

uint64_t number;
FileType type;
std::vector<std::string> old_log_files;

// Now, convert live list to an unordered set, WITHOUT mutex held;
// set is slow.
std::unordered_set<uint64_t> live_set(state.live.begin(),
state.live.end());
std::unordered_set<uint64_t> live_set(state.sstlive.begin(),
state.sstlive.end());

state.allfiles.reserve(state.allfiles.size() + state.sstdeletefiles.size());
for (auto filenum : state.sstdeletefiles) {
state.allfiles.push_back(TableFileName("", filenum));
}

state.allfiles.reserve(state.allfiles.size() + state.logdeletefiles.size());
for (auto filenum : state.logdeletefiles) {
if (filenum > 0) {
state.allfiles.push_back(LogFileName("", filenum));
}
}

for (size_t i = 0; i < state.allfiles.size(); i++) {
if (ParseFileName(state.allfiles[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= state.lognumber) ||
(number == state.prevlognumber));
keep = ((number >= state.log_number) ||
(number == state.prev_log_number));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= state.filenumber);
keep = (number >= state.manifest_file_number);
break;
case kTableFile:
keep = (live_set.find(number) != live_set.end());
Expand All @@ -570,19 +551,25 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {

if (!keep) {
if (type == kTableFile) {
// record the files to be evicted from the cache
state.files_to_evict.push_back(number);
// evict from cache
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lu", int(type), number);

if (type == kLogFile) {
DeleteLogFile(number);
Status st;
if (type == kLogFile && (options_.WAL_ttl_seconds > 0 ||
options_.WAL_size_limit_MB > 0)) {
st = env_->RenameFile(dbname_ + "/" + state.allfiles[i],
ArchivedLogFileName(options_.wal_dir,
number));
if (!st.ok()) {
Log(options_.info_log, "RenameFile logfile #%lu FAILED", number);
}
} else {
Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
if (!st.ok()) {
Log(options_.info_log, "Delete type=%d #%lld FAILED\n",
int(type),
static_cast<unsigned long long>(number));
Log(options_.info_log, "Delete type=%d #%lu FAILED\n",
int(type), number);
}
}
}
Expand All @@ -605,20 +592,14 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
}
}
PurgeObsoleteWALFiles();
}

void DBImpl::EvictObsoleteFiles(DeletionState& state) {
for (unsigned int i = 0; i < state.files_to_evict.size(); i++) {
table_cache_->Evict(state.files_to_evict[i]);
}
LogFlush(options_.info_log);
}

void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
DeletionState deletion_state;
FindObsoleteFiles(deletion_state);
FindObsoleteFiles(deletion_state, true);
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
}

// 1. Go through all archived files and
Expand Down Expand Up @@ -1091,7 +1072,8 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
return s;
}

Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) {
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state) {
mutex_.AssertHeld();
assert(imm_.size() != 0);

Expand Down Expand Up @@ -1149,22 +1131,13 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) {
}

MaybeScheduleLogDBDeployStats();
// TODO: if log deletion failed for any reason, we probably
// should store the file number in the shared state, and retry
// However, for now, PurgeObsoleteFiles will take care of that
// anyways.
bool should_delete_log = options_.purge_log_after_memtable_flush &&
!disable_delete_obsolete_files_;
if (should_delete_log) {
for (auto log_num : logs_to_delete) {
if (log_num < 0) {
continue;
}
mutex_.Unlock();
DeleteLogFile(log_num);
LogFlush(options_.info_log);
mutex_.Lock();
}

if (options_.purge_log_after_memtable_flush &&
!disable_delete_obsolete_files_) {
// add to deletion state
deletion_state.logdeletefiles.insert(deletion_state.logdeletefiles.end(),
logs_to_delete.begin(),
logs_to_delete.end());
}
}
return s;
Expand Down Expand Up @@ -1621,25 +1594,27 @@ void DBImpl::BGWorkCompaction(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}

Status DBImpl::BackgroundFlush(bool* madeProgress) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
while (stat.ok() &&
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(madeProgress);
stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
}
return stat;
}

void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
DeletionState deletion_state;
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);

if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundFlush(&madeProgress);
Status s = BackgroundFlush(&madeProgress, deletion_state);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
Expand All @@ -1652,9 +1627,18 @@ void DBImpl::BackgroundCallFlush() {
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
// clean up all the files we might have created
FindObsoleteFiles(deletion_state, true);
}
}

// delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
mutex_.Lock();
}

bg_flush_scheduled_--;
if (madeProgress) {
MaybeScheduleFlushOrCompaction();
Expand Down Expand Up @@ -1690,17 +1674,16 @@ void DBImpl::BackgroundCallCompaction() {
LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
// clean up all the files we might have created
FindObsoleteFiles(deletion_state, true);
}
}

// delete unnecessary files if any, this is done outside the mutex
if (!deletion_state.live.empty()) {
if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
LogFlush(options_.info_log);
mutex_.Lock();

}

bg_compaction_scheduled_--;
Expand Down Expand Up @@ -1728,7 +1711,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat = FlushMemTableToOutputFile(madeProgress);
Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
if (!stat.ok()) {
return stat;
}
Expand Down Expand Up @@ -1783,11 +1766,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact);
status = DoCompactionWork(compact, deletion_state);
CleanupCompaction(compact, status);
versions_->ReleaseCompactionFiles(c.get(), status);
c->ReleaseInputs();
FindObsoleteFiles(deletion_state);
versions_->GetAndFreeObsoleteFiles(&deletion_state.sstdeletefiles);
FindObsoleteFiles(deletion_state, false);
*madeProgress = true;
}
c.reset();
Expand Down Expand Up @@ -2044,7 +2028,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
return 0;
}

Status DBImpl::DoCompactionWork(CompactionState* compact) {
Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state) {
assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log,
Expand Down Expand Up @@ -2120,7 +2105,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
LogFlush(options_.info_log);
mutex_.Lock();
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
FlushMemTableToOutputFile();
FlushMemTableToOutputFile(nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
Expand Down Expand Up @@ -3376,15 +3361,14 @@ Status DBImpl::DeleteFile(std::string name) {
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_);
if (status.ok()) {
FindObsoleteFiles(deletion_state);
versions_->GetAndFreeObsoleteFiles(&deletion_state.sstdeletefiles);
}
} // lock released here
LogFlush(options_.info_log);

if (status.ok()) {
// remove files outside the db-lock
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
}
return status;
}
Expand Down
Loading

0 comments on commit 1510339

Please sign in to comment.