Skip to content

Commit

Permalink
Add support to flush multiple CFs atomically (facebook#4262)
Browse files Browse the repository at this point in the history
Summary:
Leverage existing `FlushJob` to implement atomic flush of multiple column families.

This PR depends on other PRs and is a subset of facebook#3752 . This PR itself is not sufficient in fulfilling atomic flush.
Pull Request resolved: facebook#4262

Differential Revision: D9283109

Pulled By: riversand963

fbshipit-source-id: 65401f913e4160b0a61c0be6cd02adc15dad28ed
  • Loading branch information
riversand963 authored and facebook-github-bot committed Oct 16, 2018
1 parent 32b4d4a commit e633983
Show file tree
Hide file tree
Showing 14 changed files with 1,242 additions and 101 deletions.
3 changes: 2 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_sfm_(options.sst_file_manager == nullptr),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_) {
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_commit_in_progress_(false) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
Expand Down
22 changes: 18 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -908,18 +908,18 @@ class DBImpl : public DB {
// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg()
: cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
: cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {}
BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
SuperVersionContext* superversion_context)
: cfd_(cfd),
memtable_id_(memtable_id),
max_memtable_id_(max_memtable_id),
superversion_context_(superversion_context) {}

// Column family to flush.
ColumnFamilyData* cfd_;
// Maximum ID of memtable to flush. In this column family, memtables with
// IDs smaller than this value must be flushed before this flush completes.
uint64_t memtable_id_;
uint64_t max_memtable_id_;
// Pointer to a SuperVersionContext object. After flush completes, RocksDB
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
Expand All @@ -932,6 +932,10 @@ class DBImpl : public DB {
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);

Status AtomicFlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);

// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
Expand Down Expand Up @@ -1579,6 +1583,16 @@ class DBImpl : public DB {
bool closed_;

ErrorHandler error_handler_;

// True if the DB is committing atomic flush.
// TODO (yanqin) the current impl assumes that the entire DB belongs to
// a single atomic flush group. In the future we need to add a new class
// (struct) similar to the following to make it more general.
// struct AtomicFlushGroup {
// bool commit_in_progress_;
// std::vector<MemTableList*> imm_lists;
// };
bool atomic_flush_commit_in_progress_;
};

extern Options SanitizeOptions(const std::string& db,
Expand Down
201 changes: 195 additions & 6 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ Status DBImpl::FlushMemTableToOutputFile(
}
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
&mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */);

FileMetaData file_meta;

Expand Down Expand Up @@ -169,7 +171,7 @@ Status DBImpl::FlushMemTableToOutputFile(
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
mutable_cf_options);
if (made_progress) {
*made_progress = 1;
*made_progress = true;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
Expand Down Expand Up @@ -225,6 +227,194 @@ Status DBImpl::FlushMemTablesToOutputFiles(
return s;
}

/*
* Atomically flushes multiple column families.
*
* For each column family, all memtables with ID smaller than or equal to the
* ID specified in bg_flush_args will be flushed. Only after all column
* families finish flush will this function commit to MANIFEST. If any of the
* column families are not flushed successfully, this function does not have
* any side-effect on the state of the database.
*/
Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();

autovector<ColumnFamilyData*> cfds;
for (const auto& arg : bg_flush_args) {
cfds.emplace_back(arg.cfd_);
}

#ifndef NDEBUG
for (const auto cfd : cfds) {
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
}
#endif /* !NDEBUG */

SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);

auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
autovector<Directory*> distinct_output_dirs;
std::vector<FlushJob> jobs;
int num_cfs = static_cast<int>(cfds.size());
for (int i = 0; i < num_cfs; ++i) {
auto cfd = cfds[i];
Directory* data_dir = GetDataDir(cfd, 0U);

// Add to distinct output directories if eligible. Use linear search. Since
// the number of elements in the vector is not large, performance should be
// tolerable.
bool found = false;
for (const auto dir : distinct_output_dirs) {
if (dir == data_dir) {
found = true;
break;
}
}
if (!found) {
distinct_output_dirs.emplace_back(data_dir);
}

const MutableCFOptions& mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
jobs.emplace_back(
dbname_, cfds[i], immutable_db_options_, mutable_cf_options,
max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */);
jobs.back().PickMemTable();
}

autovector<FileMetaData> file_meta;
Status s;
assert(num_cfs == static_cast<int>(jobs.size()));

for (int i = 0; i != num_cfs; ++i) {
file_meta.emplace_back(FileMetaData());

#ifndef ROCKSDB_LITE
const MutableCFOptions& mutable_cf_options =
*cfds[i]->GetLatestMutableCFOptions();
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
#endif /* !ROCKSDB_LITE */
}

if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
s = SyncClosedLogs(job_context);
}

if (s.ok()) {
// TODO (yanqin): parallelize jobs with threads.
for (int i = 0; i != num_cfs; ++i) {
s = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
if (!s.ok()) {
break;
}
}
}

if (s.ok()) {
// Sync on all distinct output directories.
for (auto dir : distinct_output_dirs) {
if (dir != nullptr) {
s = dir->Fsync();
if (!s.ok()) {
break;
}
}
}

if (s.ok()) {
autovector<const autovector<MemTable*>*> mems_list;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
mems_list.emplace_back(&mems);
}
autovector<ColumnFamilyData*> all_cfds;
autovector<MemTableList*> imm_lists;
autovector<const MutableCFOptions*> mutable_cf_options_list;
for (auto cfd : *versions_->GetColumnFamilySet()) {
all_cfds.emplace_back(cfd);
imm_lists.emplace_back(cfd->imm());
mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
}

s = MemTableList::TryInstallMemtableFlushResults(
imm_lists, all_cfds, mutable_cf_options_list, mems_list,
&atomic_flush_commit_in_progress_, &logs_with_prep_tracker_,
versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
}
}

if (s.ok()) {
assert(num_cfs ==
static_cast<int>(job_context->superversion_contexts.size()));
for (int i = 0; i != num_cfs; ++i) {
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
*cfds[i]->GetLatestMutableCFOptions());
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
cfds[i]->current()->storage_info()->LevelSummary(&tmp));
}
if (made_progress) {
*made_progress = true;
}
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
for (int i = 0; i != num_cfs; ++i) {
NotifyOnFlushCompleted(cfds[i], &file_meta[i],
*cfds[i]->GetLatestMutableCFOptions(),
job_context->job_id, jobs[i].GetTableProperties());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached() &&
error_handler_.GetBGError().ok()) {
Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
error_handler_.SetBGError(new_bg_error,
BackgroundErrorReason::kFlush);
}
}
}
#endif // ROCKSDB_LITE
}

if (!s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber());
jobs[i].Cancel();
}
if (!s.IsShutdownInProgress()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}

return s;
}

void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
Expand Down Expand Up @@ -983,7 +1173,6 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
return s;
}


Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
Status s;
WriteContext context;
Expand Down
16 changes: 10 additions & 6 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}


FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions env_options, VersionSet* versions,
const uint64_t* max_memtable_id,
const EnvOptions& env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
Expand All @@ -98,11 +98,13 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats)
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
max_memtable_id_(max_memtable_id),
env_options_(env_options),
versions_(versions),
db_mutex_(db_mutex),
Expand All @@ -118,6 +120,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats),
sync_output_directory_(sync_output_directory),
write_manifest_(write_manifest),
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false) {
Expand Down Expand Up @@ -162,7 +166,7 @@ void FlushJob::PickMemTable() {
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
if (mems_.empty()) {
return;
}
Expand Down Expand Up @@ -226,7 +230,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,

if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else {
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
Expand Down Expand Up @@ -373,7 +377,7 @@ Status FlushJob::WriteLevel0Table() {
s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");

if (s.ok() && output_file_directory_ != nullptr) {
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
Expand Down
Loading

0 comments on commit e633983

Please sign in to comment.