Skip to content

Commit

Permalink
Change the way options.compression_per_level is used when options.lev…
Browse files Browse the repository at this point in the history
…el_compaction_dynamic_level_bytes=true

Summary:
Change the way options.compression_per_level is used when options.level_compaction_dynamic_level_bytes=true so that options.compression_per_level[1] determines compression for the level L0 is merged to, options.compression_per_level[2] to the level after that, etc.

Test Plan: run all tests

Reviewers: rven, yhchiang, kradhakrishnan, igor

Reviewed By: igor

Subscribers: yoshinorim, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D34431
  • Loading branch information
siying committed Mar 11, 2015
1 parent 2b785d7 commit e9de8b6
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 15 deletions.
29 changes: 17 additions & 12 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,26 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
// If enable_compression is false, then compression is always disabled no
// matter what the values of the other two parameters are.
// Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(
const ImmutableCFOptions& ioptions, int level,
const bool enable_compression = true) {
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level,
const bool enable_compression = true) {
if (!enable_compression) {
// disable compression
return kNoCompression;
}
// If the use has specified a different compression level for each level,
// then pick the compression for that level.
if (!ioptions.compression_per_level.empty()) {
assert(level == 0 || level >= base_level);
int idx = (level == 0) ? 0 : level - base_level + 1;

const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
// belongs to. Likewise, if level is beyond the end of the
// specified compression levels, use the last value.
return ioptions.compression_per_level[std::max(0, std::min(level, n))];
return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
} else {
return ioptions.compression;
}
Expand Down Expand Up @@ -417,7 +420,8 @@ Compaction* CompactionPicker::CompactRange(
vstorage->num_levels(), input_level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id, GetCompressionType(ioptions_, output_level));
output_path_id,
GetCompressionType(ioptions_, output_level, vstorage->base_level()));

c->inputs_[0].files = inputs;
if (ExpandWhileOverlapping(cf_name, vstorage, c) == false) {
Expand Down Expand Up @@ -828,11 +832,12 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
}
assert(output_level < NumberLevels());

c = new Compaction(vstorage->num_levels(), level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, output_level));
c = new Compaction(
vstorage->num_levels(), level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, output_level, vstorage->base_level()));
c->score_ = score;

// Pick the largest file in this level that is not already
Expand Down Expand Up @@ -1160,7 +1165,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Compaction* c = new Compaction(
vstorage->num_levels(), kLevel0, kLevel0,
mutable_cf_options.MaxFileSizeForLevel(kLevel0), LLONG_MAX, path_id,
GetCompressionType(ioptions_, kLevel0, enable_compression));
GetCompressionType(ioptions_, kLevel0, 1, enable_compression));
c->score_ = score;

for (unsigned int i = start_index; i < first_index_after; i++) {
Expand Down Expand Up @@ -1280,7 +1285,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Compaction* c =
new Compaction(vstorage->num_levels(), kLevel, kLevel,
mutable_cf_options.MaxFileSizeForLevel(kLevel), LLONG_MAX,
path_id, GetCompressionType(ioptions_, kLevel));
path_id, GetCompressionType(ioptions_, kLevel, 1));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = files[loop];
Expand Down
171 changes: 171 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/convenience.h"
#include "table/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/plain_table_factory.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
Expand Down Expand Up @@ -10455,6 +10456,11 @@ TEST(DBTest, DynamicLevelMaxBytesBase) {
options.max_background_compactions = max_background_compactions;
options.num_levels = 5;

options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kLZ4Compression;
options.compression_per_level[2] = kSnappyCompression;

DestroyAndReopen(options);

for (int i = 0; i < kNKeys; i++) {
Expand Down Expand Up @@ -10642,6 +10648,171 @@ TEST(DBTest, DynamicLevelMaxBytesBase2) {
ASSERT_EQ(1U, int_prop);
}

TEST(DBTest, DynamicLevelCompressionPerLevel) {
const int kNKeys = 120;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
std::random_shuffle(std::begin(keys), std::end(keys));

Random rnd(301);
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 20480;
options.write_buffer_size = 20480;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.target_file_size_base = 2048;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 102400;
options.max_bytes_for_level_multiplier = 4;
options.max_background_compactions = 1;
options.num_levels = 5;

options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kNoCompression;
options.compression_per_level[2] = kSnappyCompression;

DestroyAndReopen(options);

// Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
// be compressed, so total data size should be more than 80K.
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
}
Flush();
dbfull()->TEST_WaitForCompact();

ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U);

// Insert 400KB. Some data will be compressed
for (int i = 21; i < 120; i++) {
ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000)));
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4), 120U * 4000U);
// Make sure data in files in L3 is not compacted by removing all files
// in L4 and calculate number of rows
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
}));
ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta);
for (auto file : cf_meta.levels[4].files) {
ASSERT_OK(dbfull()->DeleteFile(file.name));
}
int num_keys = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U);
}

TEST(DBTest, DynamicLevelCompressionPerLevel2) {
const int kNKeys = 500;
int keys[kNKeys];
for (int i = 0; i < kNKeys; i++) {
keys[i] = i;
}
std::random_shuffle(std::begin(keys), std::end(keys));

Random rnd(301);
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 6000;
options.write_buffer_size = 6000;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.hard_rate_limit = 1.1;

// Use file size to distinguish levels
// L1: 10, L2: 20, L3 40, L4 80
// L0 is less than 30
options.target_file_size_base = 10;
options.target_file_size_multiplier = 2;

options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 200;
options.max_bytes_for_level_multiplier = 8;
options.max_background_compactions = 1;
options.num_levels = 5;
std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
options.table_factory = mtf;

options.compression_per_level.resize(3);
options.compression_per_level[0] = kNoCompression;
options.compression_per_level[1] = kLZ4Compression;
options.compression_per_level[2] = kZlibCompression;

DestroyAndReopen(options);
// When base level is L4, L4 is LZ4.
std::atomic<bool> seen_lz4(false);
std::function<void(const CompressionType&, uint64_t)> cb1 =
[&](const CompressionType& ct, uint64_t size) {
ASSERT_TRUE(size <= 30 || ct == kLZ4Compression);
if (ct == kLZ4Compression) {
seen_lz4.store(true);
}
};
mock::MockTableBuilder::finish_cb_ = &cb1;
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(seen_lz4.load());

ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);

// After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
std::atomic<bool> seen_zlib(false);
std::function<void(const CompressionType&, uint64_t)> cb2 =
[&](const CompressionType& ct, uint64_t size) {
ASSERT_TRUE(size <= 30 || ct != kNoCompression);
if (ct == kZlibCompression) {
if (!seen_zlib.load()) {
seen_lz4.store(false);
}
seen_zlib.store(true);
}
// Make sure after making L4 the base level, L4 is LZ4.
if (seen_zlib.load()) {
if (ct == kLZ4Compression && size < 80) {
seen_lz4.store(true);
}
}
};
mock::MockTableBuilder::finish_cb_ = &cb2;
for (int i = 101; i < 500; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
if (i % 100 == 99) {
Flush();
dbfull()->TEST_WaitForCompact();
}
}
ASSERT_TRUE(seen_lz4.load());
ASSERT_TRUE(seen_zlib.load());
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
mock::MockTableBuilder::finish_cb_ = nullptr;
}

TEST(DBTest, DynamicCompactionOptions) {
// minimum write buffer size is enforced at 64KB
const uint64_t k32KB = 1 << 15;
Expand Down
14 changes: 14 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ struct ColumnFamilyOptions {
// be slower. This array, if non-empty, should have an entry for
// each level of the database; these override the value specified in
// the previous field 'compression'.
//
// NOTICE if level_compaction_dynamic_level_bytes=true,
// compression_per_level[0] still determines L0, but other elements
// of the array are based on base level (the level L0 files are merged
// to), and may not match the level users see from info log for metadata.
// If L0 files are merged to level-n, then, for i>0, compression_per_level[i]
// determines compaction type for level n+i-1.
// For example, if we have three 5 levels, and we determine to merge L0
// data to L4 (which means L1..L3 will be empty), then the new files go to
// L4 uses compression type compression_per_level[1].
// If now L0 is merged to L2. Data goes to L2 will be compressed
// according to compression_per_level[1], L3 using compression_per_level[2]
// and L4 using compression_per_level[3]. Compaction for each level can
// change when data grows.
std::vector<CompressionType> compression_per_level;

// different options for compression algorithms
Expand Down
5 changes: 4 additions & 1 deletion table/mock_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ Status MockTableFactory::NewTableReader(
return Status::OK();
}

std::function<void(const CompressionType&, uint64_t)>*
MockTableBuilder::finish_cb_ = nullptr;

TableBuilder* MockTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) const {
uint32_t id = GetAndWriteNextID(file);

return new MockTableBuilder(id, &file_system_);
return new MockTableBuilder(id, &file_system_, compression_type);
}

Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
Expand Down
13 changes: 11 additions & 2 deletions table/mock_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ class MockTableIterator : public Iterator {

class MockTableBuilder : public TableBuilder {
public:
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system)
: id_(id), file_system_(file_system) {}
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system,
CompressionType compression_type)
: id_(id),
file_system_(file_system),
compression_type_(compression_type) {}

// REQUIRES: Either Finish() or Abandon() has been called.
~MockTableBuilder() {}
Expand All @@ -114,6 +117,9 @@ class MockTableBuilder : public TableBuilder {
Status status() const override { return Status::OK(); }

Status Finish() override {
if (finish_cb_ != nullptr) {
(*finish_cb_)(compression_type_, FileSize());
}
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});
return Status::OK();
Expand All @@ -125,10 +131,13 @@ class MockTableBuilder : public TableBuilder {

uint64_t FileSize() const override { return table_.size(); }

static std::function<void(const CompressionType&, uint64_t)>* finish_cb_;

private:
uint32_t id_;
MockTableFileSystem* file_system_;
MockFileContents table_;
CompressionType compression_type_;
};

class MockTableFactory : public TableFactory {
Expand Down

0 comments on commit e9de8b6

Please sign in to comment.