Skip to content

Commit

Permalink
Level-based L0->L0 compaction
Browse files Browse the repository at this point in the history
Summary:
Level-based L0->L0 compaction operates on spans of files that aren't currently being compacted. It reduces the number of L0 files, thus making write stall conditions harder to reach.

- L0->L0 is triggered when base level is unavailable due to pending compactions
- L0->L0 always outputs one file of at most `max_level0_burst_file_size` bytes.
- Subcompactions are disabled for L0->L0 since we want to output one file.
- Input files are chosen as the longest span of available files that will fit within the size limit. This minimizes number of files in L0.
Closes facebook#2027

Differential Revision: D4760318

Pulled By: ajkr

fbshipit-source-id: 9d07183
  • Loading branch information
ajkr authored and facebook-github-bot committed Apr 5, 2017
1 parent a12306f commit d659faa
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 30 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

### New Features
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.
* Introduce level-based L0->L0 compactions to reduce file count, so write delays are incurred less often.

## 5.3.0 (03/08/2017)
### Public API Change
Expand Down
2 changes: 1 addition & 1 deletion db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {

one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.level0_file_num_compaction_trigger = 3;
one.write_buffer_size = 120000;

Reopen({default_cf, one});
Expand Down
2 changes: 1 addition & 1 deletion db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ bool Compaction::ShouldFormSubcompactions() const {
return false;
}
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
return start_level_ == 0 && !IsOutputLevelEmpty();
return start_level_ == 0 && output_level_ > 0 && !IsOutputLevelEmpty();
} else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
return number_levels_ > 1 && output_level_ > 0;
} else {
Expand Down
113 changes: 87 additions & 26 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
return;
}
assert(ioptions_.compaction_style != kCompactionStyleLevel ||
c->output_level() == 0 ||
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level()));
if (c->start_level() == 0 ||
ioptions_.compaction_style == kCompactionStyleUniversal) {
Expand Down Expand Up @@ -1047,13 +1048,13 @@ Compaction* LevelCompactionPicker::PickCompaction(
CompactionReason compaction_reason = CompactionReason::kUnknown;

// Find the compactions by size on all levels.
bool skipped_l0 = false;
bool skipped_l0_to_base = false;
for (int i = 0; i < NumberLevels() - 1; i++) {
score = vstorage->CompactionScore(i);
level = vstorage->CompactionScoreLevel(i);
assert(i == 0 || score <= vstorage->CompactionScore(i - 1));
if (score >= 1) {
if (skipped_l0 && level == vstorage->base_level()) {
if (skipped_l0_to_base && level == vstorage->base_level()) {
// If L0->base_level compaction is pending, don't schedule further
// compaction from base level. Otherwise L0->base_level compaction
// may starve.
Expand All @@ -1077,7 +1078,19 @@ Compaction* LevelCompactionPicker::PickCompaction(
// didn't find the compaction, clear the inputs
inputs.clear();
if (level == 0) {
skipped_l0 = true;
skipped_l0_to_base = true;
// L0->base_level may be blocked due to ongoing L0->base_level
// compactions. It may also be blocked by an ongoing compaction from
// base_level downwards.
//
// In these cases, to reduce L0 file count and thus reduce likelihood
// of write stalls, we can attempt compacting a span of files within
// L0.
if (PickIntraL0Compaction(vstorage, mutable_cf_options, &inputs)) {
output_level = 0;
compaction_reason = CompactionReason::kLevelL0FilesNum;
break;
}
}
}
}
Expand All @@ -1102,7 +1115,7 @@ Compaction* LevelCompactionPicker::PickCompaction(

// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
if (level == 0) {
if (level == 0 && output_level != 0) {
assert(level0_compactions_in_progress_.empty());
InternalKey smallest, largest;
GetRange(inputs, &smallest, &largest);
Expand All @@ -1123,33 +1136,40 @@ Compaction* LevelCompactionPicker::PickCompaction(
assert(!inputs.files.empty());
}

// Setup input files from output level
std::vector<CompactionInputFiles> compaction_inputs;
CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
&output_level_inputs, &parent_index, base_index)) {
return nullptr;
}
std::vector<FileMetaData*> grandparents;
// Setup input files from output level. For output to L0, we only compact
// spans of files that do not interact with any pending compactions, so don't
// need to consider other levels.
if (output_level != 0) {
output_level_inputs.level = output_level;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
&output_level_inputs, &parent_index, base_index)) {
return nullptr;
}

std::vector<CompactionInputFiles> compaction_inputs({inputs});
if (!output_level_inputs.empty()) {
compaction_inputs.push_back(output_level_inputs);
}
compaction_inputs.push_back(inputs);
if (!output_level_inputs.empty()) {
compaction_inputs.push_back(output_level_inputs);
}

// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output leve. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return nullptr;
// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return nullptr;
}
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
} else {
compaction_inputs.push_back(inputs);
}

std::vector<FileMetaData*> grandparents;
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
auto c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
output_level, mutable_cf_options.MaxFileSizeForLevel(output_level),
Expand Down Expand Up @@ -1275,6 +1295,47 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
return inputs->size() > 0;
}

bool LevelCompactionPicker::PickIntraL0Compaction(
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
CompactionInputFiles* inputs) {
inputs->clear();
const std::vector<FileMetaData*>& level_files =
vstorage->LevelFiles(0 /* level */);
if (level_files.size() <
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger + 2) ||
level_files[0]->being_compacted) {
// If L0 isn't accumulating much files beyond the regular trigger, don't
// resort to L0->L0 compaction yet.
return false;
}

size_t compact_bytes = level_files[0]->fd.file_size;
size_t compact_bytes_per_del_file = port::kMaxSizet;
// compaction range will be [0, span_len).
size_t span_len;
// pull in files until the amount of compaction work per deleted file begins
// increasing.
for (span_len = 1; span_len < level_files.size(); ++span_len) {
compact_bytes += level_files[span_len]->fd.file_size;
size_t new_compact_bytes_per_del_file = compact_bytes / span_len;
if (level_files[span_len]->being_compacted ||
new_compact_bytes_per_del_file > compact_bytes_per_del_file) {
break;
}
compact_bytes_per_del_file = new_compact_bytes_per_del_file;
}

if (span_len >= kMinFilesForIntraL0Compaction) {
inputs->level = 0;
for (size_t i = 0; i < span_len; ++i) {
inputs->files.push_back(level_files[i]);
}
return true;
}
return false;
}

#ifndef ROCKSDB_LITE
bool UniversalCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
Expand Down
15 changes: 15 additions & 0 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,28 @@ class LevelCompactionPicker : public CompactionPicker {
int output_level, CompactionInputFiles* inputs,
int* parent_index, int* base_index);

// For L0->L0, picks the longest span of files that aren't currently
// undergoing compaction for which work-per-deleted-file decreases. The span
// always starts from the newest L0 file.
//
// Intra-L0 compaction is independent of all other files, so it can be
// performed even when L0->base_level compactions are blocked.
//
// Returns true if `inputs` is populated with a span of files to be compacted;
// otherwise, returns false.
bool PickIntraL0Compaction(VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
CompactionInputFiles* inputs);

// If there is any file marked for compaction, put put it into inputs.
// This is still experimental. It will return meaningful results only if
// clients call experimental feature SuggestCompactRange()
void PickFilesMarkedForCompactionExperimental(const std::string& cf_name,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs,
int* level, int* output_level);

static const int kMinFilesForIntraL0Compaction = 4;
};

#ifndef ROCKSDB_LITE
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ TEST_F(CompactionPickerTest, LevelMaxScore) {
mutable_cf_options_.target_file_size_base = 10000000;
mutable_cf_options_.target_file_size_multiplier = 10;
mutable_cf_options_.max_bytes_for_level_base = 10 * 1024 * 1024;
Add(0, 1U, "150", "200", 1000000000U);
Add(0, 1U, "150", "200", 1000000U);
// Level 1 score 1.2
Add(1, 66U, "150", "200", 6000000U);
Add(1, 88U, "201", "300", 6000000U);
Expand Down
54 changes: 54 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,60 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
Options options = CurrentOptions();
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 5;
options.max_background_compactions = 2;
options.max_subcompactions = max_subcompactions_;
DestroyAndReopen(options);

const size_t kValueSize = 1 << 20;
Random rnd(301);
std::string value(RandomString(&rnd, kValueSize));

rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"LevelCompactionPicker::PickCompactionBySize:0",
"CompactionJob::Run():Start"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

// index: 0 1 2 3 4 5 6 7 8 9
// size: 1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
// score: 1.5 1.3 1.5 2.0 inf
//
// Files 0-4 will be included in an L0->L1 compaction.
//
// L0->L0 will be triggered since the sync points guarantee compaction to base
// level is still blocked when files 5-9 trigger another compaction.
//
// Files 6-9 are the longest span of available files for which
// work-per-deleted-file decreases (see "score" row above).
for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(0), "")); // prevents trivial move
if (i == 5) {
ASSERT_OK(Put(Key(i + 1), value + value));
} else {
ASSERT_OK(Put(Key(i + 1), value));
}
}
ASSERT_OK(Flush());
}
dbfull()->TEST_WaitForCompact();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();

std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files);
ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
// L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
ASSERT_EQ(2, level_to_files[0].size());
ASSERT_GT(level_to_files[1].size(), 0);
for (int i = 0; i < 2; ++i) {
ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 21);
}
}

INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),
Expand Down
8 changes: 8 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ TEST_F(DBRangeDelTest, CompactionRemovesCoveredKeys) {
TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
const int kNumPerFile = 100, kNumFiles = 4, kFileBytes = 100 << 10;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumFiles;
options.max_bytes_for_level_base = 2 * kFileBytes;
options.max_subcompactions = 4;
Expand Down Expand Up @@ -361,7 +362,14 @@ TEST_F(DBRangeDelTest, ValidLevelSubcompactionBoundaries) {
// new L1 files must be generated with non-overlapping key ranges even
// though multiple subcompactions see the same ranges deleted, else an
// assertion will fail.
//
// Only enable auto-compactions when we're ready; otherwise, the
// oversized L0 (relative to base_level) causes the compaction to run
// earlier.
ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(db_->SetOptions(db_->DefaultColumnFamily(),
{{"disable_auto_compactions", "true"}}));
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ASSERT_GT(NumTableFilesAtLevel(2), 0);
Expand Down
4 changes: 3 additions & 1 deletion db/db_sst_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
blocking_thread.WakeUp();
blocking_thread.WaitUntilDone();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1,0,0,0,1", FilesPerLevel(0));
// File just flushed is too big for L0 and L1 so gets moved to L2.
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));

metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
Expand Down
8 changes: 8 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,14 @@ void VersionStorageInfo::ComputeCompactionScore(
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
// Level-based involves L0->L0 compactions that can lead to oversized
// L0 files. Take into account size as well to avoid later giant
// compactions to the base level.
uint64_t base_level_max_bytes = MaxBytesForLevel(base_level());
score = std::max(
score, static_cast<double>(total_size) / base_level_max_bytes);
}
}
} else {
// Compute the ratio of current size to size limit.
Expand Down
1 change: 1 addition & 0 deletions utilities/lua/rocks_lua_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RocksLuaTest : public testing::Test {
options_ = Options();
options_.create_if_missing = true;
options_.compaction_filter_factory = factory;
options_.disable_auto_compactions = true;
options_.max_bytes_for_level_base =
(kKeySize + kValueSize) * kKeysPerFlush * 2;
options_.max_bytes_for_level_multiplier = 2;
Expand Down

0 comments on commit d659faa

Please sign in to comment.