Skip to content

Commit

Permalink
[metrics] Merge metrics by the same attribute
Browse files Browse the repository at this point in the history
This patch merge metrics together which have the same value of some
attributes, in order to reduce the total data size received by any
thirdparty monitor systems if they do not care about the original
metrics details.
For example, fetch metrics from tserver by:
http://<host>:<port>/metrics?merge_rules=tablet|table|table_name

All 'tablet' type metrics which have the same value of 'table_name'
attribute, will be merged together into a new 'table' type metrics,
and metric values will be aggregated.

Change-Id: I8db3d082ae847eb1d83b9e4aee57d5e4bf13e1b5
Reviewed-on: http://gerrit.cloudera.org:8080/13580
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
acelyc111 authored and adembo committed Aug 15, 2019
1 parent 0d48cad commit fe6e5cc
Show file tree
Hide file tree
Showing 7 changed files with 874 additions and 176 deletions.
23 changes: 18 additions & 5 deletions src/kudu/server/default_path_handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,30 @@ static void WriteMetricsAsJson(const MetricRegistry* const metrics,
MetricJsonOptions opts;
opts.include_raw_histograms = ParseBool(req.parsed_args, "include_raw_histograms");
opts.include_schema_info = ParseBool(req.parsed_args, "include_schema");
opts.entity_types = ParseArray(req.parsed_args, "types");
opts.entity_ids = ParseArray(req.parsed_args, "ids");
opts.entity_attrs = ParseArray(req.parsed_args, "attributes");
opts.entity_metrics = ParseArray(req.parsed_args, "metrics");

MetricFilters& filters = opts.filters;
filters.entity_types = ParseArray(req.parsed_args, "types");
filters.entity_ids = ParseArray(req.parsed_args, "ids");
filters.entity_attrs = ParseArray(req.parsed_args, "attributes");
filters.entity_metrics = ParseArray(req.parsed_args, "metrics");
vector<string> merge_rules = ParseArray(req.parsed_args, "merge_rules");
for (const auto& merge_rule : merge_rules) {
vector<string> values;
SplitStringUsing(merge_rule, "|", &values);
if (values.size() == 3) {
// Index 0: entity type needed to be merged.
// Index 1: 'merge_to' field of MergeAttributes.
// Index 2: 'attribute_to_merge_by' field of MergeAttributes.
EmplaceIfNotPresent(&opts.merge_rules, values[0], MergeAttributes(values[1], values[2]));
}
}

JsonWriter::Mode json_mode = ParseBool(req.parsed_args, "compact") ?
JsonWriter::COMPACT : JsonWriter::PRETTY;

// The number of entity_attrs should always be even because
// each pair represents a key and a value.
if (opts.entity_attrs.size() % 2 != 0) {
if (filters.entity_attrs.size() % 2 != 0) {
resp->status_code = HttpStatusCode::BadRequest;
WARN_NOT_OK(Status::InvalidArgument(""), "The parameter of 'attributes' is wrong");
} else {
Expand Down
28 changes: 28 additions & 0 deletions src/kudu/util/hdr_histogram-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,32 @@ TEST_F(HdrHistogramTest, PercentileAndCopyTest) {
ASSERT_EQ(hist.TotalSum(), copy.TotalSum());
}

void PopulateHistogram(HdrHistogram* histogram, uint64_t low, uint64_t high) {
for (uint64_t i = low; i <= high; i++) {
histogram->Increment(i);
}
}

TEST_F(HdrHistogramTest, MergeTest) {
uint64_t highest_val = 10000LU;

HdrHistogram hist(highest_val, kSigDigits);
HdrHistogram other(highest_val, kSigDigits);

PopulateHistogram(&hist, 1, 100);
PopulateHistogram(&other, 101, 250);
HdrHistogram old(hist);
hist.MergeFrom(other);

ASSERT_EQ(hist.TotalCount(), old.TotalCount() + other.TotalCount());
ASSERT_EQ(hist.TotalSum(), old.TotalSum() + other.TotalSum());
ASSERT_EQ(hist.MinValue(), 1);
ASSERT_EQ(hist.MaxValue(), 250);
ASSERT_NEAR(hist.MeanValue(), (1 + 250) / 2.0, 1e3);
ASSERT_EQ(hist.ValueAtPercentile(100.0), 250);
ASSERT_NEAR(hist.ValueAtPercentile(99.0), 250 * 99.0 / 100, 1e3);
ASSERT_NEAR(hist.ValueAtPercentile(95.0), 250 * 95.0 / 100, 1e3);
ASSERT_NEAR(hist.ValueAtPercentile(50.0), 250 * 50.0 / 100, 1e3);
}

} // namespace kudu
61 changes: 44 additions & 17 deletions src/kudu/util/hdr_histogram.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ void HdrHistogram::Increment(int64_t value) {
IncrementBy(value, 1);
}

void HdrHistogram::UpdateMinMax(int64_t min, int64_t max) {
// Update min, if needed.
{
Atomic64 min_val;
while (PREDICT_FALSE(min < (min_val = MinValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, min);
if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
}
}

// Update max, if needed.
{
Atomic64 max_val;
while (PREDICT_FALSE(max > (max_val = MaxValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, max);
if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
}
}
}

void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
DCHECK_GE(value, 0);
DCHECK_GE(count, 0);
Expand All @@ -170,23 +190,7 @@ void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
NoBarrier_AtomicIncrement(&total_count_, count);
NoBarrier_AtomicIncrement(&total_sum_, value * count);

// Update min, if needed.
{
Atomic64 min_val;
while (PREDICT_FALSE(value < (min_val = MinValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, value);
if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
}
}

// Update max, if needed.
{
Atomic64 max_val;
while (PREDICT_FALSE(value > (max_val = MaxValue()))) {
Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, value);
if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
}
}
UpdateMinMax(value, value);
}

void HdrHistogram::IncrementWithExpectedInterval(int64_t value,
Expand Down Expand Up @@ -343,6 +347,29 @@ void HdrHistogram::DumpHumanReadable(std::ostream* out) const {
}
}

void HdrHistogram::MergeFrom(const HdrHistogram& other) {
DCHECK_EQ(highest_trackable_value_, other.highest_trackable_value());
DCHECK_EQ(num_significant_digits_, other.num_significant_digits());
DCHECK_EQ(counts_array_length_, other.counts_array_length_);
DCHECK_EQ(bucket_count_, other.bucket_count_);
DCHECK_EQ(sub_bucket_count_, other.sub_bucket_count_);
DCHECK_EQ(sub_bucket_half_count_magnitude_, other.sub_bucket_half_count_magnitude_);
DCHECK_EQ(sub_bucket_half_count_, other.sub_bucket_half_count_);
DCHECK_EQ(sub_bucket_mask_, other.sub_bucket_mask_);

NoBarrier_AtomicIncrement(&total_count_, other.total_count_);
NoBarrier_AtomicIncrement(&total_sum_, other.total_sum_);

UpdateMinMax(other.min_value_, other.max_value_);

for (int i = 0; i < counts_array_length_; i++) {
Atomic64 count = NoBarrier_Load(&other.counts_[i]);
if (count > 0) {
NoBarrier_AtomicIncrement(&counts_[i], count);
}
}
}

///////////////////////////////////////////////////////////////////////
// AbstractHistogramIterator
///////////////////////////////////////////////////////////////////////
Expand Down
7 changes: 7 additions & 0 deletions src/kudu/util/hdr_histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ class HdrHistogram {

// Dump a formatted, multiline string describing this histogram to 'out'.
void DumpHumanReadable(std::ostream* out) const;

// Merges 'other' into this HdrHistogram. Values in each 'counts_' array
// bucket will be added up, and the related 'min_value_', 'max_value_',
// 'total_count_' and 'total_sum_' will be updated if needed.
void MergeFrom(const HdrHistogram& other);

private:
friend class AbstractHistogramIterator;

Expand All @@ -179,6 +185,7 @@ class HdrHistogram {

void Init();
int CountsArrayIndex(int bucket_index, int sub_bucket_index) const;
void UpdateMinMax(int64_t min, int64_t max);

uint64_t highest_trackable_value_;
int num_significant_digits_;
Expand Down
Loading

0 comments on commit fe6e5cc

Please sign in to comment.