diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 27afbca8cc507..b6ce65a1d59d8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; @@ -112,6 +113,27 @@ public abstract class AbstractFetcher { private volatile long maxWatermarkSoFar = Long.MIN_VALUE; // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + /** + * Flag indicating whether or not metrics should be exposed. + * If {@code true}, offset metrics (e.g. current offset, committed offset) and + * Kafka-shipped metrics will be registered. + */ + private final boolean useMetrics; + + /** + * The metric group which all metrics for the consumer should be registered to. + * This metric group is defined under the user scope {@link KafkaConsumerMetricConstants#KAFKA_CONSUMER_METRICS_GROUP}. + */ + private final MetricGroup consumerMetricGroup; + + @Deprecated + private final MetricGroup legacyCurrentOffsetsMetricGroup; + + @Deprecated + private final MetricGroup legacyCommittedOffsetsMetricGroup; protected AbstractFetcher( SourceContext sourceContext, @@ -127,6 +149,11 @@ protected AbstractFetcher( this.checkpointLock = sourceContext.getCheckpointLock(); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); + this.useMetrics = useMetrics; + this.consumerMetricGroup = checkNotNull(consumerMetricGroup); + this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP); + this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP); + // figure out what we watermark mode we will be using this.watermarksPeriodic = watermarksPeriodic; this.watermarksPunctuated = watermarksPunctuated; @@ -168,8 +195,9 @@ protected AbstractFetcher( unassignedPartitionsQueue.add(partition); } + // register metrics for the initial seed partitions if (useMetrics) { - addOffsetStateGauge(checkNotNull(consumerMetricGroup)); + registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates); } // if we have periodic watermarks, kick off the interval scheduler @@ -207,6 +235,10 @@ public void addDiscoveredPartitions(List newPartitions) thr watermarksPunctuated, userCodeClassLoader); + if (useMetrics) { + registerOffsetMetrics(consumerMetricGroup, newPartitionStates); + } + for (KafkaTopicPartitionState newPartitionState : newPartitionStates) { subscribedPartitionStates.add(newPartitionState); unassignedPartitionsQueue.add(newPartitionState); @@ -568,23 +600,29 @@ private List> createPartitionStateHolders( // ------------------------- Metrics ---------------------------------- /** - * Add current and committed offsets to metric group. + * For each partition, register a new metric group to expose current offsets and committed offsets. + * Per-partition metric groups can be scoped by user variables {@link KafkaConsumerMetricConstants#OFFSETS_BY_TOPIC_METRICS_GROUP} + * and {@link KafkaConsumerMetricConstants#OFFSETS_BY_PARTITION_METRICS_GROUP}. + * + *

Note: this method also registers gauges for deprecated offset metrics, to maintain backwards compatibility. * - * @param metricGroup The metric group to use + * @param consumerMetricGroup The consumer metric group + * @param partitionOffsetStates The partition offset state holders, whose values will be used to update metrics */ - protected void addOffsetStateGauge(MetricGroup metricGroup) { - MetricGroup legacyCurrentOffsetsGroup = metricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP); - MetricGroup legacyCommittedOffsetsGroup = metricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP); + private void registerOffsetMetrics( + MetricGroup consumerMetricGroup, + List> partitionOffsetStates) { - for (KafkaTopicPartitionState ktp : subscribedPartitionStates) { - MetricGroup topicPartitionGroup = metricGroup + for (KafkaTopicPartitionState ktp : partitionOffsetStates) { + MetricGroup topicPartitionGroup = consumerMetricGroup .addGroup(OFFSETS_BY_TOPIC_METRICS_GROUP, ktp.getTopic()) .addGroup(OFFSETS_BY_PARTITION_METRICS_GROUP, Integer.toString(ktp.getPartition())); + topicPartitionGroup.gauge(CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); topicPartitionGroup.gauge(COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); - legacyCurrentOffsetsGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); - legacyCommittedOffsetsGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); + legacyCurrentOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); + legacyCommittedOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(ktp), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); } }