Skip to content

Commit

Permalink
Redesign pending_outputs_
Browse files Browse the repository at this point in the history
Summary:
Here's a prototype of redesigning pending_outputs_. This way, we don't have to expose pending_outputs_ to other classes (CompactionJob, FlushJob, MemtableList). DBImpl takes care of it.

Still have to write some comments, but should be good enough to start the discussion.

Test Plan: make check, will also run stress test

Reviewers: ljin, sdong, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D28353
  • Loading branch information
igorcanadi committed Nov 7, 2014
1 parent ec101cd commit 53af5d8
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 90 deletions.
28 changes: 6 additions & 22 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,9 @@ CompactionJob::CompactionJob(
Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
VersionSet* versions, port::Mutex* db_mutex,
std::atomic<bool>* shutting_down, FileNumToPathIdMap* pending_outputs,
LogBuffer* log_buffer, Directory* db_directory, Statistics* stats,
SnapshotList* snapshots, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats, SnapshotList* snapshots,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback)
: compact_(new CompactionState(compaction)),
compaction_stats_(1),
Expand All @@ -219,7 +218,6 @@ CompactionJob::CompactionJob(
versions_(versions),
db_mutex_(db_mutex),
shutting_down_(shutting_down),
pending_outputs_(pending_outputs),
log_buffer_(log_buffer),
db_directory_(db_directory),
stats_(stats),
Expand Down Expand Up @@ -469,10 +467,6 @@ Status CompactionJob::Install(Status status) {
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), compaction_stats_);

// if there were any unused file number (mostly in case of
// compaction error), free up the entry from pending_putputs
ReleaseCompactionUnusedFileNumbers();

if (status.ok()) {
status = InstallCompactionResults();
}
Expand Down Expand Up @@ -511,8 +505,6 @@ void CompactionJob::AllocateCompactionOutputFileNumbers() {
int filesNeeded = compact_->compaction->num_input_files(1);
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
uint64_t file_number = versions_->NewFileNumber();
pending_outputs_->insert(
{file_number, compact_->compaction->GetOutputPathId()});
compact_->allocated_file_numbers.push_back(file_number);
}
}
Expand Down Expand Up @@ -1041,14 +1033,6 @@ void CompactionJob::RecordCompactionIOStats() {
IOSTATS_RESET(bytes_written);
}

// Frees up unused file number.
void CompactionJob::ReleaseCompactionUnusedFileNumbers() {
db_mutex_->AssertHeld();
for (const auto file_number : compact_->allocated_file_numbers) {
pending_outputs_->erase(file_number);
}
}

Status CompactionJob::OpenCompactionOutputFile() {
assert(compact_ != nullptr);
assert(compact_->builder == nullptr);
Expand All @@ -1061,9 +1045,10 @@ Status CompactionJob::OpenCompactionOutputFile() {
compact_->allocated_file_numbers.pop_front();
} else {
db_mutex_->Lock();
// TODO(icanadi) make Versions::next_file_number_ atomic and remove db_lock
// around here. Once we do that, AllocateCompactionOutputFileNumbers() will
// not be needed.
file_number = versions_->NewFileNumber();
pending_outputs_->insert(
{file_number, compact_->compaction->GetOutputPathId()});
db_mutex_->Unlock();
}
// Make the output file
Expand Down Expand Up @@ -1112,7 +1097,6 @@ void CompactionJob::CleanupCompaction(Status status) {
}
for (size_t i = 0; i < compact_->outputs.size(); i++) {
const CompactionState::Output& out = compact_->outputs[i];
pending_outputs_->erase(out.number);

// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.
Expand Down
9 changes: 3 additions & 6 deletions db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ class CompactionJob {
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
FileNumToPathIdMap* pending_outputs, LogBuffer* log_buffer,
Directory* db_directory, Statistics* stats,
SnapshotList* snapshot_list, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
LogBuffer* log_buffer, Directory* db_directory,
Statistics* stats, SnapshotList* snapshot_list,
bool is_snapshot_supported, std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback);

~CompactionJob() { assert(compact_ == nullptr); }
Expand Down Expand Up @@ -92,7 +91,6 @@ class CompactionJob {
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
void RecordCompactionIOStats();
void ReleaseCompactionUnusedFileNumbers();
Status OpenCompactionOutputFile();
void CleanupCompaction(Status status);

Expand All @@ -115,7 +113,6 @@ class CompactionJob {
VersionSet* versions_;
port::Mutex* db_mutex_;
std::atomic<bool>* shutting_down_;
FileNumToPathIdMap* pending_outputs_;
LogBuffer* log_buffer_;
Directory* db_directory_;
Statistics* stats_;
Expand Down
56 changes: 45 additions & 11 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}

// don't delete live files
for (auto pair : pending_outputs_) {
job_context->sst_live.emplace_back(pair.first, pair.second, 0);
if (pending_outputs_.size()) {
job_context->min_pending_output = *pending_outputs_.begin();
} else {
// delete all of them
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
}
versions_->AddLiveFiles(&job_context->sst_live);

Expand Down Expand Up @@ -567,7 +570,10 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
keep = (number >= state.manifest_file_number);
break;
case kTableFile:
keep = (sst_live_map.find(number) != sst_live_map.end());
// If the second condition is not there, this makes
// DontDeletePendingOutputs fail
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
number >= state.min_pending_output;
break;
case kTempFile:
// Any temp files that are currently being written to must
Expand Down Expand Up @@ -981,7 +987,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
Expand Down Expand Up @@ -1013,7 +1020,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
}
pending_outputs_.erase(meta.fd.GetNumber());
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
Expand Down Expand Up @@ -1044,9 +1051,9 @@ Status DBImpl::FlushMemTableToOutputFile(

FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
env_options_, versions_.get(), &mutex_, &shutting_down_,
&pending_outputs_, snapshots_.GetNewest(), job_context,
log_buffer, db_directory_.get(),
GetCompressionFlush(*cfd->ioptions()), stats_);
snapshots_.GetNewest(), job_context, log_buffer,
db_directory_.get(), GetCompressionFlush(*cfd->ioptions()),
stats_);

Status s = flush_job.Run();

Expand Down Expand Up @@ -1550,6 +1557,9 @@ void DBImpl::BackgroundCallFlush() {
{
MutexLock l(&mutex_);

auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();

Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
Expand All @@ -1573,6 +1583,8 @@ void DBImpl::BackgroundCallFlush() {
}
}

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

// If !s.ok(), this means that Flush failed. In that case, we want
// to delete all obsolete files and we force FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok());
Expand Down Expand Up @@ -1616,6 +1628,10 @@ void DBImpl::BackgroundCallCompaction() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);

auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();

assert(bg_compaction_scheduled_);
Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
Expand All @@ -1640,6 +1656,8 @@ void DBImpl::BackgroundCallCompaction() {
}
}

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

// If !s.ok(), this means that Compaction failed. In that case, we want
// to delete all obsolete files we might have created and we force
// FindObsoleteFiles(). This is because job_context does not
Expand Down Expand Up @@ -1848,9 +1866,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &mutex_, &shutting_down_, &pending_outputs_,
log_buffer, db_directory_.get(), stats_, &snapshots_,
IsSnapshotSupported(), table_cache_, std::move(yield_callback));
versions_.get(), &mutex_, &shutting_down_, log_buffer,
db_directory_.get(), stats_, &snapshots_, IsSnapshotSupported(),
table_cache_, std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
Expand Down Expand Up @@ -2968,6 +2986,22 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
}
}

std::list<uint64_t>::iterator
DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
// We need to remember the iterator of our insert, because after the
// background job is done, we need to remove that element from
// pending_outputs_.
pending_outputs_.push_back(versions_->current_next_file_number());
auto pending_outputs_inserted_elem = pending_outputs_.end();
--pending_outputs_inserted_elem;
return pending_outputs_inserted_elem;
}

void DBImpl::ReleaseFileNumberFromPendingOutputs(
std::list<uint64_t>::iterator v) {
pending_outputs_.erase(v);
}

#ifndef ROCKSDB_LITE
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
Expand Down
33 changes: 29 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <deque>
#include <limits>
#include <set>
#include <list>
#include <utility>
#include <vector>
#include <string>
Expand Down Expand Up @@ -265,6 +266,24 @@ class DBImpl : public DB {
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();

// Background process needs to call
// auto x = CaptureCurrentFileNumberInPendingOutputs()
// <do something>
// ReleaseFileNumberFromPendingOutputs(x)
// This will protect any temporary files created while <do something> is
// executing from being deleted.
// -----------
// This function will capture current file number and append it to
// pending_outputs_. This will prevent any background process to delete any
// file created after this point.
std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs();
// This function should be called with the result of
// CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file
// created between the calls CaptureCurrentFileNumberInPendingOutputs() and
// ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
// and blocked by any other pending_outputs_ calls)
void ReleaseFileNumberFromPendingOutputs(std::list<uint64_t>::iterator v);

// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
Expand Down Expand Up @@ -390,10 +409,16 @@ class DBImpl : public DB {

SnapshotList snapshots_;

// Set of table files to protect from deletion because they are
// part of ongoing compactions.
// map from pending file number ID to their path IDs.
FileNumToPathIdMap pending_outputs_;
// For each background job, pending_outputs_ keeps the current file number at
// the time that background job started.
// FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has
// number bigger than any of the file number in pending_outputs_. Since file
// numbers grow monotonically, this also means that pending_outputs_ is always
// sorted. After a background job is done executing, its file number is
// deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean
// it up.
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;

// At least one compaction or flush job is pending but not yet scheduled
// because of the max background thread limit.
Expand Down
39 changes: 39 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class SpecialEnv : public EnvWrapper {

std::atomic<uint32_t> non_writable_count_;

std::function<void()>* table_write_callback_;

explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) {
delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release);
Expand All @@ -181,6 +183,8 @@ class SpecialEnv : public EnvWrapper {
non_writeable_rate_ = 0;
new_writable_count_ = 0;
non_writable_count_ = 0;
periodic_non_writable_ = 0;
table_write_callback_ = nullptr;
}

Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
Expand All @@ -196,6 +200,9 @@ class SpecialEnv : public EnvWrapper {
base_(std::move(base)) {
}
Status Append(const Slice& data) {
if (env_->table_write_callback_) {
(*env_->table_write_callback_)();
}
if (env_->drop_writes_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
Expand Down Expand Up @@ -9042,6 +9049,38 @@ TEST(DBTest, DynamicMiscOptions) {
assert_reseek_count(300, 1);
}

TEST(DBTest, DontDeletePendingOutputs) {
Options options;
options.env = env_;
options.create_if_missing = true;
DestroyAndReopen(options);

// Every time we write to a table file, call FOF/POF with full DB scan. This
// will make sure our pending_outputs_ protection work correctly
std::function<void()> purge_obsolete_files_function = [&]() {
JobContext job_context;
dbfull()->TEST_LockMutex();
dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
dbfull()->TEST_UnlockMutex();
dbfull()->PurgeObsoleteFiles(job_context);
};

env_->table_write_callback_ = &purge_obsolete_files_function;

for (int i = 0; i < 2; ++i) {
ASSERT_OK(Put("a", "begin"));
ASSERT_OK(Put("z", "end"));
ASSERT_OK(Flush());
}

// If pending output guard does not work correctly, PurgeObsoleteFiles() will
// delete the file that Compaction is trying to create, causing this: error
// db/db_test.cc:975: IO error:
// /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory
Compact("a", "b");
}


} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
3 changes: 0 additions & 3 deletions db/filename.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ enum FileType {
kIdentityFile
};

// map from file number to path ID.
typedef std::unordered_map<uint64_t, uint32_t> FileNumToPathIdMap;

// Return the name of the log file with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".
Expand Down
Loading

0 comments on commit 53af5d8

Please sign in to comment.