Skip to content

Commit

Permalink
Enforce write buffer memory limit across column families
Browse files Browse the repository at this point in the history
Summary:
Introduces a new class for managing write buffer memory across column
families.  We supplement ColumnFamilyOptions::write_buffer_size with
ColumnFamilyOptions::write_buffer, a shared pointer to a WriteBuffer
instance that enforces memory limits before flushing out to disk.

Test Plan: Added SharedWriteBuffer unit test to db_test.cc

Reviewers: sdong, rven, ljin, igor

Reviewed By: igor

Subscribers: tnovak, yhchiang, dhruba, xjin, MarkCallaghan, yoshinorim

Differential Revision: https://reviews.facebook.net/D22581
  • Loading branch information
Jonah Cohen committed Dec 2, 2014
1 parent 37d73d5 commit a14b787
Show file tree
Hide file tree
Showing 45 changed files with 551 additions and 148 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
database which is an image of the existing database.
*New API LinkFile added to Env. If you implement your own Env class, an
implementation of the API LinkFile will have to be provided.
* MemTableRep takes MemTableAllocator instead of Arena

## 3.8.0 (11/14/2014)

Expand Down
5 changes: 5 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,11 @@ void rocksdb_options_set_info_log_level(
opt->rep.info_log_level = static_cast<InfoLogLevel>(v);
}

void rocksdb_options_set_db_write_buffer_size(rocksdb_options_t* opt,
size_t s) {
opt->rep.db_write_buffer_size = s;
}

void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) {
opt->rep.write_buffer_size = s;
}
Expand Down
24 changes: 19 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

#include "db/compaction_picker.h"
#include "db/db_impl.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "db/internal_stats.h"
#include "db/job_context.h"
#include "db/table_properties_collector.h"
Expand Down Expand Up @@ -223,6 +226,7 @@ void SuperVersionUnrefHandle(void* ptr) {
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* _dummy_versions,
Cache* _table_cache,
WriteBuffer* write_buffer,
const ColumnFamilyOptions& cf_options,
const DBOptions* db_options,
const EnvOptions& env_options,
Expand All @@ -237,6 +241,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_, ioptions_),
write_buffer_(write_buffer),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
Expand Down Expand Up @@ -413,13 +418,19 @@ void ColumnFamilyData::SetCurrent(Version* current_version) {
current_ = current_version;
}

void ColumnFamilyData::CreateNewMemtable(
MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options) {
assert(current_ != nullptr);
return new MemTable(internal_comparator_, ioptions_,
mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, ioptions_, mutable_cf_options);
SetMemtable(ConstructNewMemtable(mutable_cf_options));
mem_->Ref();
}

Expand Down Expand Up @@ -600,16 +611,18 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Cache* table_cache,
WriteBuffer* write_buffer,
WriteController* write_controller)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr,
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
ColumnFamilyOptions(), db_options,
env_options, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_buffer_(write_buffer),
write_controller_(write_controller),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
Expand Down Expand Up @@ -674,8 +687,9 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, table_cache_, options,
db_options_, env_options_, this);
new ColumnFamilyData(id, name, dummy_versions, table_cache_,
write_buffer_, options, db_options_,
env_options_, this);
Lock();
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
Expand Down
9 changes: 7 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,9 @@ class ColumnFamilyData {
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetCurrent(Version* current);
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options);
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options);

TableCache* table_cache() const { return table_cache_.get(); }
Expand Down Expand Up @@ -264,6 +265,7 @@ class ColumnFamilyData {
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBuffer* write_buffer,
const ColumnFamilyOptions& options,
const DBOptions* db_options, const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
Expand Down Expand Up @@ -294,6 +296,8 @@ class ColumnFamilyData {

std::unique_ptr<InternalStats> internal_stats_;

WriteBuffer* write_buffer_;

MemTable* mem_;
MemTableList imm_;
SuperVersion* super_version_;
Expand Down Expand Up @@ -366,7 +370,7 @@ class ColumnFamilySet {

ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteController* write_controller);
WriteBuffer* write_buffer, WriteController* write_controller);
~ColumnFamilySet();

ColumnFamilyData* GetDefault() const;
Expand Down Expand Up @@ -421,6 +425,7 @@ class ColumnFamilySet {
const DBOptions* const db_options_;
const EnvOptions env_options_;
Cache* table_cache_;
WriteBuffer* write_buffer_;
WriteController* write_controller_;
std::atomic_flag spin_lock_;
};
Expand Down
6 changes: 5 additions & 1 deletion db/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "db/compaction_job.h"
#include "db/column_family.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/db.h"
Expand All @@ -26,8 +27,10 @@ class CompactionJobTest {
dbname_(test::TmpDir() + "/compaction_job_test"),
mutable_cf_options_(Options(), ImmutableCFOptions(Options())),
table_cache_(NewLRUCache(50000, 16, 8)),
write_buffer_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_)),
table_cache_.get(), &write_buffer_,
&write_controller_)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()) {
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
Expand Down Expand Up @@ -125,6 +128,7 @@ class CompactionJobTest {
WriteController write_controller_;
DBOptions db_options_;
ColumnFamilyOptions cf_options_;
WriteBuffer write_buffer_;
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
Expand Down
4 changes: 4 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ DEFINE_bool(enable_numa, false,
"CPU and memory of same node. Use \"$numactl --hardware\" command "
"to see NUMA memory architecture.");

DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
"Number of bytes to buffer in all memtables before compacting");

DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
"Number of bytes to buffer in memtable before compacting");

Expand Down Expand Up @@ -1834,6 +1837,7 @@ class Benchmark {
Options options;
options.create_if_missing = !FLAGS_use_existing_db;
options.create_missing_column_families = FLAGS_num_column_families > 1;
options.db_write_buffer_size = FLAGS_db_write_buffer_size;
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =
Expand Down
25 changes: 22 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "db/forward_iterator.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "db/write_batch_internal.h"
#include "port/port.h"
#include "rocksdb/cache.h"
Expand Down Expand Up @@ -201,6 +202,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
default_cf_handle_(nullptr),
total_log_size_(0),
max_total_in_memory_state_(0),
write_buffer_(options.db_write_buffer_size),
tmp_batch_(),
bg_schedule_needed_(false),
bg_compaction_scheduled_(0),
Expand Down Expand Up @@ -231,7 +233,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
db_options_.table_cache_remove_scan_count_limit);

versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_));
table_cache_.get(), &write_buffer_,
&write_controller_));
column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
versions_->GetColumnFamilySet(), &flush_scheduler_));

Expand Down Expand Up @@ -2823,6 +2826,23 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
}
}
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing all column families. Write buffer is using %" PRIu64
" bytes out of a total of %" PRIu64 ".",
write_buffer_.memory_usage(), write_buffer_.buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsEmpty()) {
status = SetNewMemtableAndNewLogFile(cfd, &context);
if (!status.ok()) {
break;
}
cfd->imm()->FlushRequested();
}
}
MaybeScheduleFlushOrCompaction();
}

if (UNLIKELY(status.ok() && !bg_error_.ok())) {
Expand Down Expand Up @@ -3030,8 +3050,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
}

if (s.ok()) {
new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
mutable_cf_options);
new_mem = cfd->ConstructNewMemtable(mutable_cf_options);
new_superversion = new SuperVersion();
}
}
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
#include "db/writebuffer.h"
#include "memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
Expand Down Expand Up @@ -436,6 +437,8 @@ class DBImpl : public DB {

std::unique_ptr<Directory> db_directory_;

WriteBuffer write_buffer_;

WriteThread write_thread_;

WriteBatch tmp_batch_;
Expand Down
86 changes: 83 additions & 3 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3445,7 +3445,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {

// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conidtions, which may change from
// 2. Made assumption on the memtable flush conditions, which may change from
// time to time.
TEST(DBTest, UniversalCompactionTrigger) {
Options options;
Expand Down Expand Up @@ -3521,7 +3521,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
}
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
// After comapction, we should have 2 files, with size 4, 2.4.
// After compaction, we should have 2 files, with size 4, 2.4.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
Expand Down Expand Up @@ -3549,7 +3549,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
}
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
// After comapction, we should have 3 files, with size 4, 2.4, 2.
// After compaction, we should have 3 files, with size 4, 2.4, 2.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
Expand Down Expand Up @@ -6802,6 +6802,86 @@ TEST(DBTest, RecoverCheckFileAmount) {
}
}

TEST(DBTest, SharedWriteBuffer) {
Options options;
options.db_write_buffer_size = 100000; // this is the real limit
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);

// Trigger a flush on every CF
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}

// Flush 'dobrynia' and 'nikitich'
ASSERT_OK(Put(2, Key(2), DummyString(50000)));
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(3), DummyString(20000)));
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}

// Make 'dobrynia' and 'nikitich' both take up 40% of space
// When 'pikachu' puts us over 100%, all 3 flush.
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(3));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
}

// Some remaining writes so 'default' and 'nikitich' flush on closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(3));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(4));
}
}

TEST(DBTest, PurgeInfoLogs) {
Options options = CurrentOptions();
options.keep_log_file_num = 5;
Expand Down
Loading

0 comments on commit a14b787

Please sign in to comment.