Skip to content

Commit

Permalink
Move rate_limiter, write buffering, most perf context instrumentation…
Browse files Browse the repository at this point in the history
… and most random kill out of Env

Summary: We want to keep Env a think layer for better portability. Less platform dependent codes should be moved out of Env. In this patch, I create a wrapper of file readers and writers, and put rate limiting, write buffering, as well as most perf context instrumentation and random kill out of Env. It will make it easier to maintain multiple Env in the future.

Test Plan: Run all existing unit tests.

Reviewers: anthony, kradhakrishnan, IslamAbdelRahman, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D42321
  • Loading branch information
siying committed Jul 17, 2015
1 parent 5ec829b commit 6e9fbeb
Show file tree
Hide file tree
Showing 78 changed files with 1,079 additions and 714 deletions.
38 changes: 20 additions & 18 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/file_reader_writer.h"
#include "util/iostats_context_imp.h"
#include "util/thread_status_util.h"
#include "util/stop_watch.h"
Expand All @@ -34,7 +35,7 @@ TableBuilder* NewTableBuilder(
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
Expand Down Expand Up @@ -72,16 +73,22 @@ Status BuildTable(
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
if (iter->Valid()) {
unique_ptr<WritableFile> file;
s = env->NewWritableFile(fname, &file, env_options);
if (!s.ok()) {
return s;
}
file->SetIOPriority(io_priority);
TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer;
{
unique_ptr<WritableFile> file;
s = env->NewWritableFile(fname, &file, env_options);
if (!s.ok()) {
return s;
}
file->SetIOPriority(io_priority);

TableBuilder* builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
file.get(), compression, compression_opts);
file_writer.reset(new WritableFileWriter(std::move(file), env_options));

builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
file_writer.get(), compression, compression_opts);
}

{
// the first key is the smallest key
Expand Down Expand Up @@ -232,16 +239,11 @@ Status BuildTable(

// Finish and check for file errors
if (s.ok() && !ioptions.disable_data_sync) {
if (ioptions.use_fsync) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
s = file->Fsync();
} else {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
s = file->Sync();
}
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
file_writer->Sync(ioptions.use_fsync);
}
if (s.ok()) {
s = file->Close();
s = file_writer->Close();
}

if (s.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class Iterator;
class TableCache;
class VersionEdit;
class TableBuilder;
class WritableFile;
class WritableFileWriter;

TableBuilder* NewTableBuilder(
const ImmutableCFOptions& options,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false);

Expand Down
22 changes: 10 additions & 12 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
Expand Down Expand Up @@ -71,7 +72,7 @@ struct CompactionJob::CompactionState {
std::vector<Output> outputs;

// State kept for output being generated
std::unique_ptr<WritableFile> outfile;
std::unique_ptr<WritableFileWriter> outfile;
std::unique_ptr<TableBuilder> builder;

uint64_t total_bytes;
Expand Down Expand Up @@ -662,13 +663,8 @@ Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) {

// Finish and check for file errors
if (s.ok() && !db_options_.disableDataSync) {
if (db_options_.use_fsync) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = compact_->outfile->Fsync();
} else {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = compact_->outfile->Sync();
}
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = compact_->outfile->Sync(db_options_.use_fsync);
}
if (s.ok()) {
s = compact_->outfile->Close();
Expand Down Expand Up @@ -799,10 +795,10 @@ Status CompactionJob::OpenCompactionOutputFile() {
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
// Make the output file
unique_ptr<WritableFile> writable_file;
std::string fname = TableFileName(db_options_.db_paths, file_number,
compact_->compaction->output_path_id());
Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);

Status s = env_->NewWritableFile(fname, &writable_file, env_options_);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
Expand All @@ -820,9 +816,11 @@ Status CompactionJob::OpenCompactionOutputFile() {
out.smallest_seqno = out.largest_seqno = 0;

compact_->outputs.push_back(out);
compact_->outfile->SetIOPriority(Env::IO_LOW);
compact_->outfile->SetPreallocationBlockSize(
writable_file->SetIOPriority(Env::IO_LOW);
writable_file->SetPreallocationBlockSize(
static_cast<size_t>(compact_->compaction->OutputFilePreallocationSize()));
compact_->outfile.reset(
new WritableFileWriter(std::move(writable_file), env_options_));

ColumnFamilyData* cfd = compact_->compaction->column_family_data();
bool skip_filters = false;
Expand Down
5 changes: 4 additions & 1 deletion db/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/db.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"
Expand Down Expand Up @@ -166,8 +167,10 @@ class CompactionJobTest : public testing::Test {
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
{
log::Writer log(std::move(file));
log::Writer log(std::move(file_writer));
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
Expand Down
84 changes: 51 additions & 33 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/db_info_dumper.h"
#include "util/file_reader_writer.h"
#include "util/file_util.h"
#include "util/hash_skiplist_rep.h"
#include "util/hash_linklist_rep.h"
Expand Down Expand Up @@ -384,18 +385,22 @@ Status DBImpl::NewDB() {
new_db.SetNextFile(2);
new_db.SetLastSequence(0);

Status s;

Log(InfoLogLevel::INFO_LEVEL,
db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
if (!s.ok()) {
return s;
}
file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
{
log::Writer log(std::move(file));
unique_ptr<WritableFile> file;
EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
s = env_->NewWritableFile(manifest, &file, env_options);
if (!s.ok()) {
return s;
}
file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer));
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
Expand Down Expand Up @@ -1013,17 +1018,21 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
versions_->MarkFileNumberUsedDuringRecovery(log_number);
// Open the log file
std::string fname = LogFileName(db_options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
MaybeIgnoreError(&status);
unique_ptr<SequentialFileReader> file_reader;
{
unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
return status;
} else {
// Fail with one log file, but that's ok.
// Try next one.
continue;
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
} else {
// Fail with one log file, but that's ok.
// Try next one.
continue;
}
}
file_reader.reset(new SequentialFileReader(std::move(file)));
}

// Create the log reader.
Expand All @@ -1042,7 +1051,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(std::move(file), &reporter, true /*checksum*/,
log::Reader reader(std::move(file_reader), &reporter, true /*checksum*/,
0 /*initial_offset*/);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number,
Expand Down Expand Up @@ -3490,11 +3499,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (status.ok() && write_options.sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
if (db_options_.use_fsync) {
status = log_->file()->Fsync();
} else {
status = log_->file()->Sync();
}
status = log_->file()->Sync(db_options_.use_fsync);
if (status.ok() && !log_dir_synced_) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
Expand Down Expand Up @@ -3624,15 +3629,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
Status s;
{
if (creating_new_log) {
EnvOptions opt_env_opt =
env_->OptimizeForLogWrite(env_options_, db_options_);
s = env_->NewWritableFile(
LogFileName(db_options_.wal_dir, new_log_number), &lfile,
env_->OptimizeForLogWrite(env_options_, db_options_));
opt_env_opt);
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(
1.1 * mutable_cf_options.write_buffer_size);
new_log = new log::Writer(std::move(lfile));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt));
new_log = new log::Writer(std::move(file_writer));
log_dir_synced_ = false;
}
}
Expand Down Expand Up @@ -4031,20 +4040,26 @@ Status DBImpl::CheckConsistency() {

Status DBImpl::GetDbIdentity(std::string& identity) const {
std::string idfilename = IdentityFileName(dbname_);
unique_ptr<SequentialFile> idfile;
const EnvOptions soptions;
Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
if (!s.ok()) {
return s;
unique_ptr<SequentialFileReader> id_file_reader;
Status s;
{
unique_ptr<SequentialFile> idfile;
s = env_->NewSequentialFile(idfilename, &idfile, soptions);
if (!s.ok()) {
return s;
}
id_file_reader.reset(new SequentialFileReader(std::move(idfile)));
}

uint64_t file_size;
s = env_->GetFileSize(idfilename, &file_size);
if (!s.ok()) {
return s;
}
char* buffer = reinterpret_cast<char*>(alloca(file_size));
Slice id;
s = idfile->Read(static_cast<size_t>(file_size), &id, buffer);
s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -4176,14 +4191,17 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
EnvOptions soptions(db_options);
EnvOptions opt_env_options =
impl->db_options_.env->OptimizeForLogWrite(soptions, impl->db_options_);
s = impl->db_options_.env->NewWritableFile(
LogFileName(impl->db_options_.wal_dir, new_log_number), &lfile,
impl->db_options_.env->OptimizeForLogWrite(soptions,
impl->db_options_));
opt_env_options);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_options));
impl->log_.reset(new log::Writer(std::move(file_writer)));

// set column family handles
for (auto cf : column_families) {
Expand Down
13 changes: 7 additions & 6 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "table/mock_table.h"
#include "table/plain_table_factory.h"
#include "util/db_test_util.h"
#include "util/file_reader_writer.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
Expand Down Expand Up @@ -6008,7 +6009,9 @@ class RecoveryTestHelper {
std::string fname = LogFileName(test->dbname_, current_log_number);
unique_ptr<WritableFile> file;
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
current_log_writer.reset(new log::Writer(std::move(file)));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
current_log_writer.reset(new log::Writer(std::move(file_writer)));

for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString(count++);
Expand Down Expand Up @@ -7231,8 +7234,7 @@ TEST_F(DBTest, RateLimitingTest) {
}
elapsed = env_->NowMicros() - start;
Close();
ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() ==
env_->bytes_written_);
ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio);
ASSERT_TRUE(ratio < 0.8);
Expand All @@ -7251,11 +7253,10 @@ TEST_F(DBTest, RateLimitingTest) {
}
elapsed = env_->NowMicros() - start;
Close();
ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() ==
env_->bytes_written_);
ASSERT_EQ(options.rate_limiter->GetTotalBytesThrough(), env_->bytes_written_);
ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate;
fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio);
ASSERT_TRUE(ratio < 0.6);
ASSERT_LT(ratio, 0.6);
}

namespace {
Expand Down
4 changes: 2 additions & 2 deletions db/fault_injection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ struct FileState {

} // anonymous namespace

// A wrapper around WritableFile which informs another Env whenever this file
// A wrapper around WritableFileWriter* file
// is written to or sync'ed.
class TestWritableFile : public WritableFile {
public:
Expand Down Expand Up @@ -197,7 +197,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
Status s = target()->NewWritableFile(fname, result, soptions);
if (s.ok()) {
result->reset(new TestWritableFile(fname, std::move(*result), this));
// WritableFile doesn't append to files, so if the same file is opened
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
Expand Down
Loading

0 comments on commit 6e9fbeb

Please sign in to comment.