Skip to content

Commit

Permalink
[FLINK-8419] [kafka] Move consumer metric group registration to Flink…
Browse files Browse the repository at this point in the history
…KafkaConsumerBase

This commit is a refactor to move the registration of the consumer
metric group (user scope "KafkaConsumer") to FlinkKafkaConsumerBase.
Previously, the registration was scattered around in Kafka
version-specific subclasses.
  • Loading branch information
tzulitai committed Feb 6, 2018
1 parent 2330e8d commit 0a1ce00
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
Expand All @@ -31,7 +32,6 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -179,9 +179,9 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSc
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode) throws Exception {

boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {

// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
Expand All @@ -198,10 +198,11 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSc
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
runtimeContext.getMetricGroup(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public Kafka010Fetcher(
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
super(
sourceContext,
Expand All @@ -69,10 +70,11 @@ public Kafka010Fetcher(
autoWatermarkInterval,
userCodeClassLoader,
taskNameWithSubtasks,
metricGroup,
deserializer,
kafkaProperties,
pollTimeout,
subtaskMetricGroup,
consumerMetricGroup,
useMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ public Void answer(InvocationOnMock invocation) {
10,
getClass().getClassLoader(),
"taskname-with-subtask",
new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
new UnregisteredMetricsGroup(),
new UnregisteredMetricsGroup(),
false);

// ----- run the fetcher -----
Expand Down Expand Up @@ -262,10 +263,11 @@ public Void answer(InvocationOnMock invocation) {
10,
getClass().getClassLoader(),
"taskname-with-subtask",
new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
new UnregisteredMetricsGroup(),
new UnregisteredMetricsGroup(),
false);

// ----- run the fetcher -----
Expand Down Expand Up @@ -376,10 +378,11 @@ public void testCancellationWhenEmitBlocks() throws Exception {
10, /* watermark interval */
this.getClass().getClassLoader(),
"task_name",
new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
new UnregisteredMetricsGroup(),
new UnregisteredMetricsGroup(),
false);

// ----- run the fetcher -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
Expand All @@ -43,6 +44,7 @@
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getBoolean;
import static org.apache.flink.util.PropertiesUtil.getLong;

/**
Expand Down Expand Up @@ -217,7 +219,8 @@ private FlinkKafkaConsumer08(
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));

this.kafkaProperties = props;

Expand All @@ -235,9 +238,9 @@ private FlinkKafkaConsumer08(
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode) throws Exception {

boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false);
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {

long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC)
? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000)
Expand All @@ -252,6 +255,7 @@ private FlinkKafkaConsumer08(
deserializer,
kafkaProperties,
autoCommitInterval,
consumerMetricGroup,
useMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -98,6 +97,7 @@ public Kafka08Fetcher(
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long autoCommitInterval,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
super(
sourceContext,
Expand All @@ -107,6 +107,7 @@ public Kafka08Fetcher(
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
consumerMetricGroup,
useMetrics);

this.deserializer = checkNotNull(deserializer);
Expand Down Expand Up @@ -178,12 +179,6 @@ public void runFetchLoop() throws Exception {
periodicCommitter.start();
}

// register offset metrics
if (useMetrics) {
final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP);
addOffsetStateGauge(kafkaMetricGroup);
}

// Main loop polling elements from the unassignedPartitions queue to the threads
while (running) {
// re-throw any exception from the concurrent fetcher threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
Expand Down Expand Up @@ -46,6 +47,7 @@
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getBoolean;
import static org.apache.flink.util.PropertiesUtil.getLong;

/**
Expand Down Expand Up @@ -208,7 +210,8 @@ private FlinkKafkaConsumer09(
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));

this.properties = props;
setDeserializer(this.properties);
Expand All @@ -233,9 +236,9 @@ private FlinkKafkaConsumer09(
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode) throws Exception {

boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {

// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
Expand All @@ -252,10 +255,11 @@ private FlinkKafkaConsumer09(
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
runtimeContext.getMetricGroup(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}

Expand All @@ -270,7 +274,7 @@ protected AbstractPartitionDiscoverer createPartitionDiscoverer(

@Override
protected boolean getIsAutoCommitEnabled() {
return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -83,10 +82,11 @@ public Kafka09Fetcher(
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
super(
sourceContext,
Expand All @@ -95,25 +95,24 @@ public Kafka09Fetcher(
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
userCodeClassLoader.getParent(),
consumerMetricGroup,
useMetrics);

this.deserializer = deserializer;
this.handover = new Handover();

final MetricGroup kafkaMetricGroup = metricGroup.addGroup(KAFKA_CONSUMER_METRICS_GROUP);
addOffsetStateGauge(kafkaMetricGroup);

this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
kafkaMetricGroup,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics);
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public class KafkaConsumerThread extends Thread {
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;

/** We get this from the outside to publish metrics. **/
private final MetricGroup kafkaMetricGroup;

/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */
private final KafkaConsumerCallBridge consumerCallBridge;

Expand All @@ -92,6 +89,16 @@ public class KafkaConsumerThread extends Thread {
/** Flag whether to add Kafka's metrics to the Flink metrics. */
private final boolean useMetrics;

/**
* @deprecated We should only be publishing to the {{@link #consumerMetricGroup}}.
* This is kept to retain compatibility for metrics.
**/
@Deprecated
private final MetricGroup subtaskMetricGroup;

/** We get this from the outside to publish metrics. */
private final MetricGroup consumerMetricGroup;

/** Reference to the Kafka consumer, once it is created. */
private volatile KafkaConsumer<byte[], byte[]> consumer;

Expand All @@ -118,19 +125,21 @@ public KafkaConsumerThread(
Handover handover,
Properties kafkaProperties,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
MetricGroup kafkaMetricGroup,
KafkaConsumerCallBridge consumerCallBridge,
String threadName,
long pollTimeout,
boolean useMetrics) {
boolean useMetrics,
MetricGroup consumerMetricGroup,
MetricGroup subtaskMetricGroup) {

super(threadName);
setDaemon(true);

this.log = checkNotNull(log);
this.handover = checkNotNull(handover);
this.kafkaProperties = checkNotNull(kafkaProperties);
this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.subtaskMetricGroup = checkNotNull(subtaskMetricGroup);
this.consumerCallBridge = checkNotNull(consumerCallBridge);

this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue);
Expand Down Expand Up @@ -178,7 +187,10 @@ public void run() {
} else {
// we have Kafka metrics, register them
for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
consumerMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));

// TODO this metric is kept for compatibility purposes; should remove in the future
subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
}
}
}
Expand Down
Loading

0 comments on commit 0a1ce00

Please sign in to comment.