Skip to content

Commit

Permalink
[FLINK-26850][metrics] Add Metric#getMetricType
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Mar 28, 2022
1 parent 865299d commit 5d54921
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public interface Counter extends Metric {
* @return current count
*/
long getCount();

@Override
default MetricType getMetricType() {
return MetricType.COUNTER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public interface Gauge<T> extends Metric {
* @return calculated value
*/
T getValue();

@Override
default MetricType getMetricType() {
return MetricType.GAUGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public interface Histogram extends Metric {
* @return Statistics about the currently recorded elements
*/
HistogramStatistics getStatistics();

@Override
default MetricType getMetricType() {
return MetricType.HISTOGRAM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public interface Meter extends Metric {
* @return number of events marked on the meter
*/
long getCount();

@Override
default MetricType getMetricType() {
return MetricType.METER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@

/** Common super interface for all metrics. */
@Public
public interface Metric {}
public interface Metric {
default MetricType getMetricType() {
throw new UnsupportedOperationException("Custom metric types are not supported.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics;

import org.apache.flink.annotation.Public;

/** Enum describing the different metric types. */
@Public
public enum MetricType {
COUNTER,
METER,
GAUGE,
HISTOGRAM
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,49 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
final String name = group.getMetricIdentifier(metricName, this);

synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, name);
} else {
log.warn(
"Cannot add unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
switch (metric.getMetricType()) {
case COUNTER:
counters.put((Counter) metric, name);
break;
case GAUGE:
gauges.put((Gauge<?>) metric, name);
break;
case HISTOGRAM:
histograms.put((Histogram) metric, name);
break;
case METER:
meters.put((Meter) metric, name);
break;
default:
log.warn(
"Cannot add unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
}
}
}

@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
if (metric instanceof Counter) {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else {
log.warn(
"Cannot remove unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
switch (metric.getMetricType()) {
case COUNTER:
counters.remove(metric);
break;
case GAUGE:
gauges.remove(metric);
break;
case HISTOGRAM:
histograms.remove(metric);
break;
case METER:
meters.remove(metric);
break;
default:
log.warn(
"Cannot remove unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,42 +77,52 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
tags.addAll(getTagsFromMetricGroup(group));
String host = getHostFromMetricGroup(group);

if (metric instanceof Counter) {
Counter c = (Counter) metric;
counters.put(c, new DCounter(c, name, host, tags, clock));
} else if (metric instanceof Gauge) {
Gauge g = (Gauge) metric;
gauges.put(g, new DGauge(g, name, host, tags, clock));
} else if (metric instanceof Meter) {
Meter m = (Meter) metric;
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags, clock));
} else if (metric instanceof Histogram) {
Histogram h = (Histogram) metric;
histograms.put(h, new DHistogram(h, name, host, tags, clock));
} else {
LOGGER.warn(
"Cannot add unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
switch (metric.getMetricType()) {
case COUNTER:
Counter c = (Counter) metric;
counters.put(c, new DCounter(c, name, host, tags, clock));
break;
case GAUGE:
Gauge g = (Gauge) metric;
gauges.put(g, new DGauge(g, name, host, tags, clock));
break;
case METER:
Meter m = (Meter) metric;
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags, clock));
break;
case HISTOGRAM:
Histogram h = (Histogram) metric;
histograms.put(h, new DHistogram(h, name, host, tags, clock));
break;
default:
LOGGER.warn(
"Cannot add unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
}
}

@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
if (metric instanceof Counter) {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else {
LOGGER.warn(
"Cannot remove unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
switch (metric.getMetricType()) {
case COUNTER:
counters.remove(metric);
break;
case GAUGE:
gauges.remove(metric);
break;
case METER:
meters.remove(metric);
break;
case HISTOGRAM:
histograms.remove(metric);
break;
default:
LOGGER.warn(
"Cannot remove unknown metric type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,38 +127,43 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
final String fullName = group.getMetricIdentifier(metricName, this);

synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);

if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(
fullName,
((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);

if (meter instanceof DropwizardMeterWrapper) {
registry.register(
fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn(
"Cannot add metric of type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
switch (metric.getMetricType()) {
case COUNTER:
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
break;
case GAUGE:
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
break;
case HISTOGRAM:
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);

if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(
fullName,
((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
break;
case METER:
Meter meter = (Meter) metric;
meters.put(meter, fullName);

if (meter instanceof DropwizardMeterWrapper) {
registry.register(
fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
break;
default:
log.warn(
"Cannot add metric of type {}. This indicates that the reporter "
+ "does not support this metric type.",
metric.getClass().getName());
}
}
}
Expand All @@ -168,16 +173,21 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup
synchronized (this) {
String fullName;

if (metric instanceof Counter) {
fullName = counters.remove(metric);
} else if (metric instanceof Gauge) {
fullName = gauges.remove(metric);
} else if (metric instanceof Histogram) {
fullName = histograms.remove(metric);
} else if (metric instanceof Meter) {
fullName = meters.remove(metric);
} else {
fullName = null;
switch (metric.getMetricType()) {
case COUNTER:
fullName = counters.remove(metric);
break;
case GAUGE:
fullName = gauges.remove(metric);
break;
case HISTOGRAM:
fullName = histograms.remove(metric);
break;
case METER:
fullName = meters.remove(metric);
break;
default:
fullName = null;
}

if (fullName != null) {
Expand Down
Loading

0 comments on commit 5d54921

Please sign in to comment.