Skip to content

Commit

Permalink
Add a counter about estimated pending compaction bytes
Browse files Browse the repository at this point in the history
Summary:
Add a counter of estimated bytes the DB needs to compact for all the compactions to finish. Expose it as a DB Property.
In the future, we can use threshold of this counter to replace soft rate limit and hard rate limit. A single threshold of estimated compaction debt in bytes will be easier for users to reason about when should slow down and stopping than more abstract soft and hard rate limits.

Test Plan: Add unit tests

Reviewers: IslamAbdelRahman, yhchiang, rven, kradhakrishnan, anthony, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D44205
  • Loading branch information
siying committed Aug 21, 2015
1 parent 41a0e28 commit 07d2d34
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 3 deletions.
85 changes: 85 additions & 0 deletions db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,91 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) {
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
}

TEST_F(CompactionPickerTest, EstimateCompactionBytesNeeded1) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = false;
mutable_cf_options_.level0_file_num_compaction_trigger = 3;
mutable_cf_options_.max_bytes_for_level_base = 1000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200", 200);
Add(0, 2U, "150", "200", 200);
Add(0, 3U, "150", "200", 200);
// Level 1 is over target by 200
Add(1, 4U, "400", "500", 600);
Add(1, 5U, "600", "700", 600);
// Level 2 is less than target 10000 even added size of level 1
Add(2, 6U, "150", "200", 2500);
Add(2, 7U, "201", "210", 2000);
Add(2, 8U, "300", "310", 2500);
Add(2, 9U, "400", "500", 2500);
// Level 3 exceeds target 100,000 of 1000
Add(3, 10U, "400", "500", 101000);
// Level 4 exceeds target 1,000,000 of 500 after adding size from level 3
Add(4, 11U, "400", "500", 999500);
Add(5, 11U, "400", "500", 8000000);

UpdateVersionStorageInfo();

ASSERT_EQ(2200u + 11000u + 5500u,
vstorage_->estimated_compaction_needed_bytes());
}

TEST_F(CompactionPickerTest, EstimateCompactionBytesNeeded2) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = false;
mutable_cf_options_.level0_file_num_compaction_trigger = 3;
mutable_cf_options_.max_bytes_for_level_base = 1000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 1U, "150", "200", 200);
Add(0, 2U, "150", "200", 200);
Add(0, 4U, "150", "200", 200);
Add(0, 5U, "150", "200", 200);
Add(0, 6U, "150", "200", 200);
// Level 1 is over target by
Add(1, 7U, "400", "500", 200);
Add(1, 8U, "600", "700", 200);
// Level 2 is less than target 10000 even added size of level 1
Add(2, 9U, "150", "200", 9500);
Add(3, 10U, "400", "500", 101000);

UpdateVersionStorageInfo();

ASSERT_EQ(1400u + 4400u + 11000u,
vstorage_->estimated_compaction_needed_bytes());
}

TEST_F(CompactionPickerTest, EstimateCompactionBytesNeededDynamicLevel) {
int num_levels = ioptions_.num_levels;
ioptions_.level_compaction_dynamic_level_bytes = true;
mutable_cf_options_.level0_file_num_compaction_trigger = 3;
mutable_cf_options_.max_bytes_for_level_base = 1000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
NewVersionStorage(num_levels, kCompactionStyleLevel);

// Set Last level size 50000
// num_levels - 1 target 5000
// num_levels - 2 is base level with taret 500
Add(num_levels - 1, 10U, "400", "500", 50000);

Add(0, 1U, "150", "200", 200);
Add(0, 2U, "150", "200", 200);
Add(0, 4U, "150", "200", 200);
Add(0, 5U, "150", "200", 200);
Add(0, 6U, "150", "200", 200);
// num_levels - 3 is over target by 100 + 1000
Add(num_levels - 3, 7U, "400", "500", 300);
Add(num_levels - 3, 8U, "600", "700", 300);
// Level 2 is over target by 1100 + 100
Add(num_levels - 2, 9U, "150", "200", 5100);

UpdateVersionStorageInfo();

ASSERT_EQ(1600u + 12100u + 13200u,
vstorage_->estimated_compaction_needed_bytes());
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
52 changes: 52 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2287,6 +2287,58 @@ TEST_F(DBTest, ApproximateMemoryUsage) {
ASSERT_EQ(unflushed_mem, all_mem);
}

TEST_F(DBTest, EstimatePendingCompBytes) {
// Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);

Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 2;
options.max_background_compactions = 1;
options.max_background_flushes = 1;
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number_to_maintain = 0;
options.write_buffer_size = 1000000;
Reopen(options);

std::string big_value(1000000 * 2, 'x');
std::string num;
uint64_t int_num;

ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
Flush();
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-pending-compaction-bytes", &int_num));
ASSERT_EQ(int_num, 0U);

ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value));
Flush();
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-pending-compaction-bytes", &int_num));
ASSERT_EQ(int_num, 0U);

ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value));
Flush();
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-pending-compaction-bytes", &int_num));
ASSERT_GT(int_num, 0U);

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();

dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-pending-compaction-bytes", &int_num));
ASSERT_EQ(int_num, 0U);
}

TEST_F(DBTest, FLUSH) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
Expand Down
9 changes: 9 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ static const std::string num_live_versions = "num-live-versions";
static const std::string estimate_live_data_size = "estimate-live-data-size";
static const std::string base_level = "base-level";
static const std::string total_sst_files_size = "total-sst-files-size";
static const std::string estimate_pending_comp_bytes =
"estimate-pending-compaction-bytes";

const std::string DB::Properties::kNumFilesAtLevelPrefix =
rocksdb_prefix + num_files_at_level_prefix;
Expand Down Expand Up @@ -168,6 +170,8 @@ const std::string DB::Properties::kEstimateLiveDataSize =
rocksdb_prefix + estimate_live_data_size;
const std::string DB::Properties::kTotalSstFilesSize =
rocksdb_prefix + total_sst_files_size;
const std::string DB::Properties::kEstimatePendingCompactionBytes =
rocksdb_prefix + estimate_pending_comp_bytes;

DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
bool* need_out_of_mutex) {
Expand Down Expand Up @@ -241,6 +245,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
return kBaseLevel;
} else if (in == total_sst_files_size) {
return kTotalSstFilesSize;
} else if (in == estimate_pending_comp_bytes) {
return kEstimatePendingCompactionBytes;
}
return kUnknown;
}
Expand Down Expand Up @@ -409,6 +415,9 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
case kTotalSstFilesSize:
*value = cfd_->GetTotalSstFilesSize();
return true;
case kEstimatePendingCompactionBytes:
*value = vstorage->estimated_compaction_needed_bytes();
return true;
default:
return false;
}
Expand Down
7 changes: 4 additions & 3 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ enum DBPropertyType : uint32_t {
kNumSnapshots, // Number of snapshots in the system
kOldestSnapshotTime, // Unix timestamp of the first snapshot
kNumLiveVersions,
kEstimateLiveDataSize, // Estimated amount of live data in bytes
kTotalSstFilesSize, // Total size of all sst files.
kBaseLevel, // The level that L0 data is compacted to
kEstimateLiveDataSize, // Estimated amount of live data in bytes
kTotalSstFilesSize, // Total size of all sst files.
kBaseLevel, // The level that L0 data is compacted to
kEstimatePendingCompactionBytes, // Estimated bytes to compaction
};

extern DBPropertyType GetPropertyType(const Slice& property,
Expand Down
58 changes: 58 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ VersionStorageInfo::VersionStorageInfo(
accumulated_num_non_deletions_(0),
accumulated_num_deletions_(0),
num_samples_(0),
estimated_compaction_needed_bytes_(0),
finalized_(false) {
if (ref_vstorage != nullptr) {
accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
Expand Down Expand Up @@ -1014,6 +1015,62 @@ int VersionStorageInfo::MaxInputLevel() const {
return 0;
}

void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) {
// Only implemented for level-based compaction
if (compaction_style_ != kCompactionStyleLevel) {
return;
}

// Start from Level 0, if level 0 qualifies compaction to level 1,
// we estimate the size of compaction.
// Then we move on to the next level and see whether it qualifies compaction
// to the next level. The size of the level is estimated as the actual size
// on the level plus the input bytes from the previous level if there is any.
// If it exceeds, take the exceeded bytes as compaction input and add the size
// of the compaction size to tatal size.
// We keep doing it to Level 2, 3, etc, until the last level and return the
// accumulated bytes.

size_t bytes_compact_to_next_level = 0;
// Level 0
bool level0_compact_triggered = false;
if (static_cast<int>(files_[0].size()) >
mutable_cf_options.level0_file_num_compaction_trigger) {
level0_compact_triggered = true;
for (auto* f : files_[0]) {
bytes_compact_to_next_level += f->fd.GetFileSize();
}
estimated_compaction_needed_bytes_ = bytes_compact_to_next_level;
} else {
estimated_compaction_needed_bytes_ = 0;
}

// Level 1 and up.
for (int level = base_level(); level <= MaxInputLevel(); level++) {
size_t level_size = 0;
for (auto* f : files_[level]) {
level_size += f->fd.GetFileSize();
}
if (level == base_level() && level0_compact_triggered) {
// Add base level size to compaction if level0 compaction triggered.
estimated_compaction_needed_bytes_ += level_size;
}
// Add size added by previous compaction
level_size += bytes_compact_to_next_level;
bytes_compact_to_next_level = 0;
size_t level_target = MaxBytesForLevel(level);
if (level_size > level_target) {
bytes_compact_to_next_level = level_size - level_target;
// Simplify to assume the actual compaction fan-out ratio is always
// mutable_cf_options.max_bytes_for_level_multiplier.
estimated_compaction_needed_bytes_ +=
bytes_compact_to_next_level *
(1 + mutable_cf_options.max_bytes_for_level_multiplier);
}
}
}

void VersionStorageInfo::ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options,
const CompactionOptionsFIFO& compaction_options_fifo) {
Expand Down Expand Up @@ -1098,6 +1155,7 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
ComputeFilesMarkedForCompaction();
EstimateCompactionBytesNeeded(mutable_cf_options);
}

void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
Expand Down
11 changes: 11 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class VersionStorageInfo {
const MutableCFOptions& mutable_cf_options,
const CompactionOptionsFIFO& compaction_options_fifo);

// Estimate est_comp_needed_bytes_
void EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options);

// This computes files_marked_for_compaction_ and is called by
// ComputeCompactionScore()
void ComputeFilesMarkedForCompaction();
Expand Down Expand Up @@ -315,6 +319,10 @@ class VersionStorageInfo {
// Returns an estimate of the amount of live data in bytes.
uint64_t EstimateLiveDataSize() const;

uint64_t estimated_compaction_needed_bytes() const {
return estimated_compaction_needed_bytes_;
}

private:
const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_;
Expand Down Expand Up @@ -389,6 +397,9 @@ class VersionStorageInfo {
uint64_t accumulated_num_deletions_;
// the number of samples
uint64_t num_samples_;
// Estimated bytes needed to be compacted until all levels' size is down to
// target sizes.
uint64_t estimated_compaction_needed_bytes_;

bool finalized_;

Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ class DB {
// "rocksdb.estimate-live-data-size"
// "rocksdb.total-sst-files-size" - total size of all used sst files, this may
// slow down online queries if there are too many files.
// "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes" - estimated total number of
// bytes compaction needs to rewrite the data to get all levels down
// to under target size. Not valid for other compactions than level-based.
#ifndef ROCKSDB_LITE
struct Properties {
static const std::string kNumFilesAtLevelPrefix;
Expand All @@ -356,6 +360,7 @@ class DB {
static const std::string kNumLiveVersions;
static const std::string kEstimateLiveDataSize;
static const std::string kTotalSstFilesSize;
static const std::string kEstimatePendingCompactionBytes;
};
#endif /* ROCKSDB_LITE */

Expand Down Expand Up @@ -387,6 +392,8 @@ class DB {
// "rocksdb.num-live-versions"
// "rocksdb.estimate-live-data-size"
// "rocksdb.total-sst-files-size"
// "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes"
virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) = 0;
virtual bool GetIntProperty(const Slice& property, uint64_t* value) {
Expand Down

0 comments on commit 07d2d34

Please sign in to comment.