Skip to content

Commit

Permalink
Make writable_file_max_buffer_size dynamic
Browse files Browse the repository at this point in the history
Summary:
The DBOptions::writable_file_max_buffer_size can be changed dynamically.
Closes facebook#3053

Differential Revision: D6152720

Pulled By: shligit

fbshipit-source-id: aa0c0cfcfae6a54eb17faadb148d904797c68681
  • Loading branch information
shligit authored and facebook-github-bot committed Oct 31, 2017
1 parent c1be8d8 commit 33c7d4c
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 23 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case.

### New Features
* `DBOptions::writable_file_max_buffer_size` can now be changed dynamically.
* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file.
* Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound.
* Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`.
Expand Down
5 changes: 5 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,11 @@ void rocksdb_options_set_bytes_per_sync(
opt->rep.bytes_per_sync = v;
}

void rocksdb_options_set_writable_file_max_buffer_size(rocksdb_options_t* opt,
uint64_t v) {
opt->rep.writable_file_max_buffer_size = v;
}

void rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.allow_concurrent_memtable_write = v;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ Status DBImpl::SetDBOptions(
env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite(
env_options_for_compaction_,
immutable_db_options_);
versions_->ChangeEnvOptions(mutable_db_options_);
write_thread_.EnterUnbatched(&w, &mutex_);
if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
Status purge_wal_status = SwitchWAL(&write_context);
Expand Down
52 changes: 52 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,58 @@ TEST_F(DBOptionsTest, SetWalBytesPerSync) {
ASSERT_GT(low_bytes_per_sync, counter);
}

TEST_F(DBOptionsTest, WritableFileMaxBufferSize) {
Options options;
options.create_if_missing = true;
options.writable_file_max_buffer_size = 1024 * 1024;
options.level0_file_num_compaction_trigger = 3;
options.max_manifest_file_size = 1;
options.env = env_;
int buffer_size = 1024 * 1024;
Reopen(options);
ASSERT_EQ(buffer_size,
dbfull()->GetDBOptions().writable_file_max_buffer_size);

std::atomic<int> match_cnt(0);
std::atomic<int> unmatch_cnt(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::WritableFileWriter:0", [&](void* arg) {
int value = static_cast<int>(reinterpret_cast<uintptr_t>(arg));
if (value == buffer_size) {
match_cnt++;
} else {
unmatch_cnt++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
int i = 0;
for (; i < 3; i++) {
ASSERT_OK(Put("foo", ToString(i)));
ASSERT_OK(Put("bar", ToString(i)));
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(unmatch_cnt, 0);
ASSERT_GE(match_cnt, 11);

buffer_size = 512 * 1024;
match_cnt = 0;
unmatch_cnt = 0;
ASSERT_OK(
dbfull()->SetDBOptions({{"writable_file_max_buffer_size", "524288"}}));
ASSERT_EQ(buffer_size,
dbfull()->GetDBOptions().writable_file_max_buffer_size);
i = 0;
for (; i < 3; i++) {
ASSERT_OK(Put("foo", ToString(i)));
ASSERT_OK(Put("bar", ToString(i)));
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(unmatch_cnt, 0);
ASSERT_GE(match_cnt, 11);
}

TEST_F(DBOptionsTest, SetOptionsAndReopen) {
Random rnd(1044);
auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd);
Expand Down
28 changes: 16 additions & 12 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
auto table_cache = cfd_->table_cache();
auto ioptions = cfd_->ioptions();
Status s = table_cache->GetTableProperties(
vset_->env_options_, cfd_->internal_comparator(), file_meta->fd,
env_options_, cfd_->internal_comparator(), file_meta->fd,
tp, true /* no io */);
if (s.ok()) {
return s;
Expand All @@ -599,7 +599,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
}
s = ioptions->env->NewRandomAccessFile(file_name, &file, vset_->env_options_);
s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -711,7 +711,7 @@ size_t Version::GetMemoryUsageByTableReaders() {
for (auto& file_level : storage_info_.level_files_brief_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
vset_->env_options_, cfd_->internal_comparator(),
env_options_, cfd_->internal_comparator(),
file_level.files[i].fd);
}
}
Expand Down Expand Up @@ -936,7 +936,7 @@ VersionStorageInfo::VersionStorageInfo(
}

Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
uint64_t version_number)
const EnvOptions& env_opt, uint64_t version_number)
: env_(vset->env_),
cfd_(column_family_data),
info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
Expand All @@ -959,6 +959,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
next_(this),
prev_(this),
refs_(0),
env_options_(env_opt),
version_number_(version_number) {}

void Version::Get(const ReadOptions& read_options, const LookupKey& k,
Expand Down Expand Up @@ -2532,7 +2533,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
LogAndApplyCFHelper(w.edit_list.front());
batch_edits.push_back(w.edit_list.front());
} else {
v = new Version(column_family_data, this, current_version_number_++);
v = new Version(column_family_data, this, env_options_,
current_version_number_++);
builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
auto* builder = builder_guard->version_builder();
for (const auto& writer : manifest_writers_) {
Expand Down Expand Up @@ -2577,7 +2579,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued.
{

EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
mu->Unlock();

TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
Expand All @@ -2599,7 +2601,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
s = NewWritableFile(
env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, opt_env_opts);
Expand Down Expand Up @@ -3064,7 +3065,8 @@ Status VersionSet::Recover(
false /* prefetch_index_and_filter_in_cache */);
}

Version* v = new Version(cfd, this, current_version_number_++);
Version* v =
new Version(cfd, this, env_options_, current_version_number_++);
builder->SaveTo(v->storage_info());

// Install recovered version
Expand Down Expand Up @@ -3422,7 +3424,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder();

Version* v = new Version(cfd, this, current_version_number_++);
Version* v =
new Version(cfd, this, env_options_, current_version_number_++);
builder->SaveTo(v->storage_info());
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);

Expand Down Expand Up @@ -3634,7 +3637,7 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
// approximate offset of "key" within the table.
TableReader* table_reader_ptr;
InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd,
ReadOptions(), v->env_options_, v->cfd_->internal_comparator(), f.fd,
nullptr /* range_del_agg */, &table_reader_ptr);
if (table_reader_ptr != nullptr) {
result = table_reader_ptr->ApproximateOffsetOf(key);
Expand Down Expand Up @@ -3865,15 +3868,16 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
assert(edit->is_column_family_add_);

Version* dummy_versions = new Version(nullptr, this);
Version* dummy_versions = new Version(nullptr, this, env_options_);
// Ref() dummy version once so that later we can call Unref() to delete it
// by avoiding calling "delete" explicitly (~Version is private)
dummy_versions->Ref();
auto new_cfd = column_family_set_->CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions,
cf_options);

Version* v = new Version(new_cfd, this, current_version_number_++);
Version* v =
new Version(new_cfd, this, env_options_, current_version_number_++);

// Fill level target base information.
v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
Expand Down
10 changes: 8 additions & 2 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,14 @@ class Version {
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
const EnvOptions env_options_;

// A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
uint64_t version_number_;

Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
Version(ColumnFamilyData* cfd, VersionSet* vset, const EnvOptions& env_opt,
uint64_t version_number = 0);

~Version();

Expand Down Expand Up @@ -844,6 +846,10 @@ class VersionSet {

ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
const EnvOptions& env_options() { return env_options_; }
void ChangeEnvOptions(const MutableDBOptions& new_options) {
env_options_.writable_file_max_buffer_size =
new_options.writable_file_max_buffer_size;
}

static uint64_t GetNumLiveVersions(Version* dummy_versions);

Expand Down Expand Up @@ -908,7 +914,7 @@ class VersionSet {
std::vector<std::string> obsolete_manifests_;

// env options for all reads and writes except compactions
const EnvOptions& env_options_;
EnvOptions env_options_;

// env options used for compactions. This is a copy of
// env_options_ but with readaheads set to readahead_compactions_.
Expand Down
2 changes: 2 additions & 0 deletions env/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
optimized_env_options.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized_env_options;
}

Expand Down
2 changes: 2 additions & 0 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ class PosixEnv : public Env {
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
optimized.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized;
}

Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bytes_per_sync(
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_bytes_per_sync(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_writable_file_max_buffer_size(rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t*,
unsigned char);
extern ROCKSDB_LIBRARY_API void
Expand Down
9 changes: 5 additions & 4 deletions options/db_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
options.new_table_reader_for_compaction_inputs),
compaction_readahead_size(options.compaction_readahead_size),
random_access_max_buffer_size(options.random_access_max_buffer_size),
writable_file_max_buffer_size(options.writable_file_max_buffer_size),
use_adaptive_mutex(options.use_adaptive_mutex),
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
Expand Down Expand Up @@ -175,9 +174,6 @@ void ImmutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt,
random_access_max_buffer_size);
ROCKS_LOG_HEADER(
log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt,
writable_file_max_buffer_size);
ROCKS_LOG_HEADER(log, " Options.use_adaptive_mutex: %d",
use_adaptive_mutex);
ROCKS_LOG_HEADER(log, " Options.rate_limiter: %p",
Expand Down Expand Up @@ -230,6 +226,7 @@ MutableDBOptions::MutableDBOptions()
base_background_compactions(-1),
max_background_compactions(-1),
avoid_flush_during_shutdown(false),
writable_file_max_buffer_size(1024 * 1024),
delayed_write_rate(2 * 1024U * 1024U),
max_total_wal_size(0),
delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000),
Expand All @@ -243,6 +240,7 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions),
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
writable_file_max_buffer_size(options.writable_file_max_buffer_size),
delayed_write_rate(options.delayed_write_rate),
max_total_wal_size(options.max_total_wal_size),
delete_obsolete_files_period_micros(
Expand All @@ -259,6 +257,9 @@ void MutableDBOptions::Dump(Logger* log) const {
max_background_compactions);
ROCKS_LOG_HEADER(log, " Options.avoid_flush_during_shutdown: %d",
avoid_flush_during_shutdown);
ROCKS_LOG_HEADER(
log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt,
writable_file_max_buffer_size);
ROCKS_LOG_HEADER(log, " Options.delayed_write_rate : %" PRIu64,
delayed_write_rate);
ROCKS_LOG_HEADER(log, " Options.max_total_wal_size: %" PRIu64,
Expand Down
2 changes: 1 addition & 1 deletion options/db_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ struct ImmutableDBOptions {
bool new_table_reader_for_compaction_inputs;
size_t compaction_readahead_size;
size_t random_access_max_buffer_size;
size_t writable_file_max_buffer_size;
bool use_adaptive_mutex;
std::vector<std::shared_ptr<EventListener>> listeners;
bool enable_thread_tracking;
Expand Down Expand Up @@ -93,6 +92,7 @@ struct MutableDBOptions {
int base_background_compactions;
int max_background_compactions;
bool avoid_flush_during_shutdown;
size_t writable_file_max_buffer_size;
uint64_t delayed_write_rate;
uint64_t max_total_wal_size;
uint64_t delete_obsolete_files_period_micros;
Expand Down
2 changes: 1 addition & 1 deletion options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.random_access_max_buffer_size =
immutable_db_options.random_access_max_buffer_size;
options.writable_file_max_buffer_size =
immutable_db_options.writable_file_max_buffer_size;
mutable_db_options.writable_file_max_buffer_size;
options.use_adaptive_mutex = immutable_db_options.use_adaptive_mutex;
options.listeners = immutable_db_options.listeners;
options.enable_thread_tracking = immutable_db_options.enable_thread_tracking;
Expand Down
7 changes: 4 additions & 3 deletions options/options_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,6 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"random_access_max_buffer_size",
{offsetof(struct DBOptions, random_access_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}},
{"writable_file_max_buffer_size",
{offsetof(struct DBOptions, writable_file_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
Expand Down Expand Up @@ -351,6 +348,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{offsetof(struct DBOptions, avoid_flush_during_shutdown),
OptionType::kBoolean, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, avoid_flush_during_shutdown)}},
{"writable_file_max_buffer_size",
{offsetof(struct DBOptions, writable_file_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, writable_file_max_buffer_size)}},
{"allow_ingest_behind",
{offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
Expand Down
2 changes: 2 additions & 0 deletions port/win/env_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,8 @@ EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options,
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
optimized.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized;
}

Expand Down
3 changes: 3 additions & 0 deletions util/file_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"
#include "util/sync_point.h"

namespace rocksdb {

Expand Down Expand Up @@ -151,6 +152,8 @@ class WritableFileWriter {
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter),
stats_(stats) {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
}
Expand Down

0 comments on commit 33c7d4c

Please sign in to comment.