Skip to content

Commit

Permalink
Adding stats for the merge and filter operation
Browse files Browse the repository at this point in the history
Summary:
We have addded new stats and perf_context for measuring the merge and filter operation time consumption.
We have bounded all the merge operations within the GUARD statment and collected the total time for these operations in the DB.

Test Plan: WIP

Reviewers: rven, yhchiang, kradhakrishnan, igor, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D34377
  • Loading branch information
Anurag Indu authored and Anurag Indu committed Mar 24, 2015
1 parent afc5164 commit 3d1a924
Show file tree
Hide file tree
Showing 18 changed files with 382 additions and 116 deletions.
29 changes: 23 additions & 6 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Status CompactionJob::Run() {
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact_->compaction));
backup_input->SeekToFirst();
uint64_t total_filter_time = 0;
while (backup_input->Valid() &&
!shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
Expand Down Expand Up @@ -369,7 +370,9 @@ Status CompactionJob::Run() {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact_->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compaction_filter_v2);
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->cur_prefix_ = key_prefix.ToString();
}
Expand Down Expand Up @@ -401,7 +404,9 @@ Status CompactionJob::Run() {
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact_->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compaction_filter_v2);
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

Expand All @@ -417,11 +422,14 @@ Status CompactionJob::Run() {
// finish the last batch
if (status.ok()) {
if (compact_->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compaction_filter_v2);
uint64_t time = 0;
CallCompactionFilterV2(compaction_filter_v2, &time);
total_filter_time += time;
}
compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
} // checking for compaction filter v2

if (status.ok() &&
Expand Down Expand Up @@ -556,6 +564,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;

StopWatchNano timer(env_, stats_ != nullptr);
uint64_t total_filter_time = 0;
while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
!cfd->IsDropped() && status.ok()) {
compact_->num_input_records++;
Expand Down Expand Up @@ -642,9 +653,13 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// the entry with a delete marker.
bool value_changed = false;
compaction_filter_value.clear();
if (stats_ != nullptr) {
timer.Start();
}
bool to_delete = compaction_filter->Filter(
compact_->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
total_filter_time += timer.ElapsedNanos();
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
Expand Down Expand Up @@ -712,7 +727,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// optimization in BuildTable.
int steps = 0;
merge.MergeUntil(input, prev_snapshot, bottommost_level_,
db_options_.statistics.get(), &steps);
db_options_.statistics.get(), &steps, env_);
// Skip the Merge ops
combined_idx = combined_idx - 1 + steps;

Expand Down Expand Up @@ -844,6 +859,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
input->Next();
}
}
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
}
Expand All @@ -859,7 +875,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}

void CompactionJob::CallCompactionFilterV2(
CompactionFilterV2* compaction_filter_v2) {
CompactionFilterV2* compaction_filter_v2, uint64_t* time) {
if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
return;
}
Expand Down Expand Up @@ -889,10 +905,11 @@ void CompactionJob::CallCompactionFilterV2(
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
StopWatchNano timer(env_, stats_ != nullptr);
compact_->to_delete_buf_ = compaction_filter_v2->Filter(
compact_->compaction->level(), user_key_buf, existing_value_buf,
&compact_->new_value_buf_, &compact_->value_changed_buf_);

*time = timer.ElapsedNanos();
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size());
Expand Down
3 changes: 2 additions & 1 deletion db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class CompactionJob {
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
bool is_compaction_v2);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2);
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2,
uint64_t* time);
Status FinishCompactionOutputFile(Iterator* input);
Status InstallCompactionResults(InstrumentedMutex* db_mutex);
SequenceNumber findEarliestVisibleSnapshot(
Expand Down
60 changes: 45 additions & 15 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,14 @@ void DBIter::MergeValuesNewToOld() {
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice val = iter_->value();
user_merge_operator_->FullMerge(ikey.user_key, &val, operands,
&saved_value_, logger_);
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(ikey.user_key, &val, operands,
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
// iter_ is positioned after put
iter_->Next();
return;
Expand All @@ -328,12 +334,17 @@ void DBIter::MergeValuesNewToOld() {
}
}

// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
}

void DBIter::Prev() {
Expand Down Expand Up @@ -434,14 +445,24 @@ bool DBIter::FindValueForCurrentKey() {
return false;
case kTypeMerge:
if (last_not_merge_type == kTypeDeletion) {
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
} else {
assert(last_not_merge_type == kTypeValue);
std::string last_put_value = saved_value_;
Slice temp_slice(last_put_value);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
operands, &saved_value_, logger_);
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
operands, &saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
timer.ElapsedNanos());
}
}
break;
case kTypeValue:
Expand Down Expand Up @@ -492,9 +513,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) ||
ikey.type == kTypeDeletion) {
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);

{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0)) {
Expand All @@ -506,8 +531,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}

const Slice& val = iter_->value();
user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands,
&saved_value_, logger_);
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands,
&saved_value_, logger_);
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
}
valid_ = true;
return true;
}
Expand Down
Loading

0 comments on commit 3d1a924

Please sign in to comment.