Skip to content

Commit

Permalink
[FLINK-8419] [kafka] Register metrics for dynamically discovered Kafk…
Browse files Browse the repository at this point in the history
…a partitions

This closes apache#5335.
  • Loading branch information
tzulitai committed Feb 6, 2018
1 parent 0a1ce00 commit 40f26c8
Showing 1 changed file with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
Expand Down Expand Up @@ -112,6 +113,27 @@ public abstract class AbstractFetcher<T, KPH> {
private volatile long maxWatermarkSoFar = Long.MIN_VALUE;

// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------

/**
* Flag indicating whether or not metrics should be exposed.
* If {@code true}, offset metrics (e.g. current offset, committed offset) and
* Kafka-shipped metrics will be registered.
*/
private final boolean useMetrics;

/**
* The metric group which all metrics for the consumer should be registered to.
* This metric group is defined under the user scope {@link KafkaConsumerMetricConstants#KAFKA_CONSUMER_METRICS_GROUP}.
*/
private final MetricGroup consumerMetricGroup;

@Deprecated
private final MetricGroup legacyCurrentOffsetsMetricGroup;

@Deprecated
private final MetricGroup legacyCommittedOffsetsMetricGroup;

protected AbstractFetcher(
SourceContext<T> sourceContext,
Expand All @@ -127,6 +149,11 @@ protected AbstractFetcher(
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);

this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);

// figure out what we watermark mode we will be using
this.watermarksPeriodic = watermarksPeriodic;
this.watermarksPunctuated = watermarksPunctuated;
Expand Down Expand Up @@ -168,8 +195,9 @@ protected AbstractFetcher(
unassignedPartitionsQueue.add(partition);
}

// register metrics for the initial seed partitions
if (useMetrics) {
addOffsetStateGauge(checkNotNull(consumerMetricGroup));
registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
}

// if we have periodic watermarks, kick off the interval scheduler
Expand Down Expand Up @@ -207,6 +235,10 @@ public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) thr
watermarksPunctuated,
userCodeClassLoader);

if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, newPartitionStates);
}

for (KafkaTopicPartitionState<KPH> newPartitionState : newPartitionStates) {
subscribedPartitionStates.add(newPartitionState);
unassignedPartitionsQueue.add(newPartitionState);
Expand Down Expand Up @@ -568,23 +600,29 @@ private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(
// ------------------------- Metrics ----------------------------------

/**
* Add current and committed offsets to metric group.
* For each partition, register a new metric group to expose current offsets and committed offsets.
* Per-partition metric groups can be scoped by user variables {@link KafkaConsumerMetricConstants#OFFSETS_BY_TOPIC_METRICS_GROUP}
* and {@link KafkaConsumerMetricConstants#OFFSETS_BY_PARTITION_METRICS_GROUP}.
*
* <p>Note: this method also registers gauges for deprecated offset metrics, to maintain backwards compatibility.
*
* @param metricGroup The metric group to use
* @param consumerMetricGroup The consumer metric group
* @param partitionOffsetStates The partition offset state holders, whose values will be used to update metrics
*/
protected void addOffsetStateGauge(MetricGroup metricGroup) {
MetricGroup legacyCurrentOffsetsGroup = metricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
MetricGroup legacyCommittedOffsetsGroup = metricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
private void registerOffsetMetrics(
MetricGroup consumerMetricGroup,
List<KafkaTopicPartitionState<KPH>> partitionOffsetStates) {

for (KafkaTopicPartitionState<KPH> ktp : subscribedPartitionStates) {
MetricGroup topicPartitionGroup = metricGroup
for (KafkaTopicPartitionState<KPH> ktp : partitionOffsetStates) {
MetricGroup topicPartitionGroup = consumerMetricGroup
.addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, ktp.getTopic())
.addGroup(OFFSETS_BY_PARTITION_METRICS_GROUP, Integer.toString(ktp.getPartition()));

topicPartitionGroup.gauge(CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
topicPartitionGroup.gauge(COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));

legacyCurrentOffsetsGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
legacyCommittedOffsetsGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
legacyCurrentOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
legacyCommittedOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
}
}

Expand Down

0 comments on commit 40f26c8

Please sign in to comment.