diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index d0b61e2f0504f..394d1239a92f8 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -31,7 +32,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -179,9 +179,9 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSc SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; // this overwrites whatever setting the user configured in the properties @@ -198,10 +198,11 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSc runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), deserializer, properties, pollTimeout, + runtimeContext.getMetricGroup(), + consumerMetricGroup, useMetrics); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index cddccdc4770b2..9b9b217b205f0 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -55,10 +55,11 @@ public Kafka010Fetcher( long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - MetricGroup metricGroup, KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + MetricGroup subtaskMetricGroup, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -69,10 +70,11 @@ public Kafka010Fetcher( autoWatermarkInterval, userCodeClassLoader, taskNameWithSubtasks, - metricGroup, deserializer, kafkaProperties, pollTimeout, + subtaskMetricGroup, + consumerMetricGroup, useMetrics); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java index 45ceadc1863fc..f57fbea67e147 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java @@ -126,10 +126,11 @@ public Void answer(InvocationOnMock invocation) { 10, getClass().getClassLoader(), "taskname-with-subtask", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -262,10 +263,11 @@ public Void answer(InvocationOnMock invocation) { 10, getClass().getClassLoader(), "taskname-with-subtask", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -376,10 +378,11 @@ public void testCancellationWhenEmitBlocks() throws Exception { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 3718476b80e24..312c8c46403ab 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -43,6 +44,7 @@ import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.PropertiesUtil.getBoolean; import static org.apache.flink.util.PropertiesUtil.getLong; /** @@ -217,7 +219,8 @@ private FlinkKafkaConsumer08( deserializer, getLong( checkNotNull(props, "props"), - KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED)); + KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), + !getBoolean(props, KEY_DISABLE_METRICS, false)); this.kafkaProperties = props; @@ -235,9 +238,9 @@ private FlinkKafkaConsumer08( SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC) ? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000) @@ -252,6 +255,7 @@ private FlinkKafkaConsumer08( deserializer, kafkaProperties, autoCommitInterval, + consumerMetricGroup, useMetrics); } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 23d87d7707fac..a2edb72d2dac7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -48,7 +48,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -98,6 +97,7 @@ public Kafka08Fetcher( KeyedDeserializationSchema deserializer, Properties kafkaProperties, long autoCommitInterval, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -107,6 +107,7 @@ public Kafka08Fetcher( runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), + consumerMetricGroup, useMetrics); this.deserializer = checkNotNull(deserializer); @@ -178,12 +179,6 @@ public void runFetchLoop() throws Exception { periodicCommitter.start(); } - // register offset metrics - if (useMetrics) { - final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP); - addOffsetStateGauge(kafkaMetricGroup); - } - // Main loop polling elements from the unassignedPartitions queue to the threads while (running) { // re-throw any exception from the concurrent fetcher threads diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 578a2d2ec9687..6e6051b35e40d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -46,6 +47,7 @@ import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.PropertiesUtil.getBoolean; import static org.apache.flink.util.PropertiesUtil.getLong; /** @@ -208,7 +210,8 @@ private FlinkKafkaConsumer09( deserializer, getLong( checkNotNull(props, "props"), - KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED)); + KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), + !getBoolean(props, KEY_DISABLE_METRICS, false)); this.properties = props; setDeserializer(this.properties); @@ -233,9 +236,9 @@ private FlinkKafkaConsumer09( SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - - boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; // this overwrites whatever setting the user configured in the properties @@ -252,10 +255,11 @@ private FlinkKafkaConsumer09( runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), deserializer, properties, pollTimeout, + runtimeContext.getMetricGroup(), + consumerMetricGroup, useMetrics); } @@ -270,7 +274,7 @@ protected AbstractPartitionDiscoverer createPartitionDiscoverer( @Override protected boolean getIsAutoCommitEnabled() { - return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && + return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0; } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 162cc09575ccf..58393600633cf 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -45,7 +45,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP; import static org.apache.flink.util.Preconditions.checkState; /** @@ -83,10 +82,11 @@ public Kafka09Fetcher( long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - MetricGroup metricGroup, KeyedDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, + MetricGroup subtaskMetricGroup, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, @@ -95,25 +95,24 @@ public Kafka09Fetcher( watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, - userCodeClassLoader, + userCodeClassLoader.getParent(), + consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); - final MetricGroup kafkaMetricGroup = metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP); - addOffsetStateGauge(kafkaMetricGroup); - this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, - kafkaMetricGroup, createCallBridge(), getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, - useMetrics); + useMetrics, + consumerMetricGroup, + subtaskMetricGroup); } // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index c995910c7cf88..38e8a41d474cf 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -80,9 +80,6 @@ public class KafkaConsumerThread extends Thread { /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue> unassignedPartitionsQueue; - /** We get this from the outside to publish metrics. **/ - private final MetricGroup kafkaMetricGroup; - /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */ private final KafkaConsumerCallBridge consumerCallBridge; @@ -92,6 +89,16 @@ public class KafkaConsumerThread extends Thread { /** Flag whether to add Kafka's metrics to the Flink metrics. */ private final boolean useMetrics; + /** + * @deprecated We should only be publishing to the {{@link #consumerMetricGroup}}. + * This is kept to retain compatibility for metrics. + **/ + @Deprecated + private final MetricGroup subtaskMetricGroup; + + /** We get this from the outside to publish metrics. */ + private final MetricGroup consumerMetricGroup; + /** Reference to the Kafka consumer, once it is created. */ private volatile KafkaConsumer consumer; @@ -118,11 +125,12 @@ public KafkaConsumerThread( Handover handover, Properties kafkaProperties, ClosableBlockingQueue> unassignedPartitionsQueue, - MetricGroup kafkaMetricGroup, KafkaConsumerCallBridge consumerCallBridge, String threadName, long pollTimeout, - boolean useMetrics) { + boolean useMetrics, + MetricGroup consumerMetricGroup, + MetricGroup subtaskMetricGroup) { super(threadName); setDaemon(true); @@ -130,7 +138,8 @@ public KafkaConsumerThread( this.log = checkNotNull(log); this.handover = checkNotNull(handover); this.kafkaProperties = checkNotNull(kafkaProperties); - this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); + this.consumerMetricGroup = checkNotNull(consumerMetricGroup); + this.subtaskMetricGroup = checkNotNull(subtaskMetricGroup); this.consumerCallBridge = checkNotNull(consumerCallBridge); this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue); @@ -178,7 +187,10 @@ public void run() { } else { // we have Kafka metrics, register them for (Map.Entry metric: metrics.entrySet()) { - kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + consumerMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + + // TODO this metric is kept for compatibility purposes; should remove in the future + subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); } } } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java index e4e276a966971..27b67f1098413 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java @@ -126,10 +126,11 @@ public Void answer(InvocationOnMock invocation) { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -261,10 +262,11 @@ public Void answer(InvocationOnMock invocation) { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- @@ -375,10 +377,11 @@ public void testCancellationWhenEmitBlocks() throws Exception { 10, /* watermark interval */ this.getClass().getClassLoader(), "task_name", - new UnregisteredMetricsGroup(), schema, new Properties(), 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), false); // ----- run the fetcher ----- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java index 383eb13ea339b..3d4018714f2c1 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java @@ -716,11 +716,12 @@ public TestKafkaConsumerThread( handover, new Properties(), unassignedPartitionsQueue, - new UnregisteredMetricsGroup(), new KafkaConsumerCallBridge(), "test-kafka-consumer-thread", 0, - false); + false, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup()); this.mockConsumer = mockConsumer; } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 660af21e92fe7..d126301c07ded 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -65,6 +66,7 @@ import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER; import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER; +import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -192,6 +194,13 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti // internal metrics // ------------------------------------------------------------------------ + /** + * Flag indicating whether or not metrics should be exposed. + * If {@code true}, offset metrics (e.g. current offset, committed offset) and + * Kafka-shipped metrics will be registered. + */ + private final boolean useMetrics; + /** Counter for successful Kafka offset commits. */ private transient Counter successfulCommits; @@ -221,7 +230,8 @@ public FlinkKafkaConsumerBase( List topics, Pattern topicPattern, KeyedDeserializationSchema deserializer, - long discoveryIntervalMillis) { + long discoveryIntervalMillis, + boolean useMetrics) { this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern); this.deserializer = checkNotNull(deserializer, "valueDeserializer"); @@ -229,6 +239,8 @@ public FlinkKafkaConsumerBase( discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0, "Cannot define a negative value for the topic / partition discovery interval."); this.discoveryIntervalMillis = discoveryIntervalMillis; + + this.useMetrics = useMetrics; } // ------------------------------------------------------------------------ @@ -560,7 +572,9 @@ public void onException(Throwable cause) { periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), - offsetCommitMode); + offsetCommitMode, + getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), + useMetrics); if (!running) { return; @@ -839,7 +853,9 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception; + OffsetCommitMode offsetCommitMode, + MetricGroup kafkaMetricGroup, + boolean useMetrics) throws Exception; /** * Creates the partition discoverer that is used to find new partitions for this subtask. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index c48a2b5ccd659..27afbca8cc507 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -91,9 +91,6 @@ public abstract class AbstractFetcher { /** The mode describing whether the fetcher also generates timestamps and watermarks. */ protected final int timestampWatermarkMode; - /** Flag whether to register metrics for the fetcher. */ - protected final boolean useMetrics; - /** * Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. @@ -124,10 +121,10 @@ protected AbstractFetcher( ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, + MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); - this.useMetrics = useMetrics; this.userCodeClassLoader = checkNotNull(userCodeClassLoader); // figure out what we watermark mode we will be using @@ -171,6 +168,10 @@ protected AbstractFetcher( unassignedPartitionsQueue.add(partition); } + if (useMetrics) { + addOffsetStateGauge(checkNotNull(consumerMetricGroup)); + } + // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @SuppressWarnings("unchecked") diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 84f0e388e1969..a533ccaa50257 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -376,7 +377,8 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase) mock(KeyedDeserializationSchema.class), - discoveryInterval); + discoveryInterval, + false); this.fetcher = fetcher; this.partitions = partitions; @@ -393,7 +395,9 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { return fetcher; } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index e6fd3fe1775f6..41b609ec31995 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -640,7 +640,8 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase) mock(KeyedDeserializationSchema.class), - PARTITION_DISCOVERY_DISABLED); + PARTITION_DISCOVERY_DISABLED, + false); this.testFetcher = testFetcher; this.testPartitionDiscoverer = testPartitionDiscoverer; @@ -655,7 +656,9 @@ private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) throws Exception { return this.testFetcher; } @@ -752,6 +755,7 @@ private MockFetcher(HashMap... stateSnapshotsToReturn new TestProcessingTimeService(), 0, MockFetcher.class.getClassLoader(), + new UnregisteredMetricsGroup(), false); this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn)); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index 6fe1d6f6326e0..d276cfbb46d30 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internals; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; @@ -411,6 +412,7 @@ protected TestFetcher( processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), + new UnregisteredMetricsGroup(), false); }