Skip to content

Commit

Permalink
[FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer
Browse files Browse the repository at this point in the history
This closes apache#3746.
  • Loading branch information
tzulitai committed Jul 1, 2017
1 parent b8c8f20 commit 085d4db
Show file tree
Hide file tree
Showing 30 changed files with 2,981 additions and 1,006 deletions.
15 changes: 15 additions & 0 deletions docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,21 @@ Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

### Kafka Consumers Partition Discovery

The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
job starts running) will be consumed from the earliest possible offset.

By default, partition discovery is disabled. To enable it, set a non-negative value
for `flink.partition-discovery.interval-millis` in the provided properties config,
representing the discovery interval in milliseconds.

<span class="label label-danger">Limitation</span> When the consumer is restored from a savepoint from Flink versions
prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would fail
with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
then restore again from that.

### Kafka Consumers Offset Committing Behaviour Configuration

The Flink Kafka Consumer allows configuring the behaviour of how offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
Expand Down Expand Up @@ -159,4 +162,13 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
pollTimeout,
useMetrics);
}

@Override
protected AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks) {

return new Kafka010PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;

import java.util.Properties;

/**
* A partition discoverer that can be used to discover topics and partitions metadata
* from Kafka brokers via the Kafka 0.10 high-level consumer API.
*/
public class Kafka010PartitionDiscoverer extends Kafka09PartitionDiscoverer {

public Kafka010PartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks,
Properties kafkaProperties) {

super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,26 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;

import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;

import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getInt;
import static org.apache.flink.util.PropertiesUtil.getLong;

/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Expand Down Expand Up @@ -172,9 +160,12 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer);
super(
topics,
null,
deserializer,
getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));

checkNotNull(topics, "topics");
this.kafkaProperties = checkNotNull(props, "props");

// validate the zookeeper properties
Expand Down Expand Up @@ -212,22 +203,12 @@ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> d
}

@Override
protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
// Connect to a broker to get the partitions for all topics
List<KafkaTopicPartition> partitionInfos =
KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties));

if (partitionInfos.size() == 0) {
throw new RuntimeException(
"Unable to retrieve any partitions for the requested topics " + topics +
". Please check previous log entries");
}

if (LOG.isInfoEnabled()) {
logPartitionInfo(LOG, partitionInfos);
}
protected AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks) {

return partitionInfos;
return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
}

@Override
Expand All @@ -237,104 +218,12 @@ protected boolean getIsAutoCommitEnabled() {
}

// ------------------------------------------------------------------------
// Kafka / ZooKeeper communication utilities
// Kafka / ZooKeeper configuration utilities
// ------------------------------------------------------------------------

/**
* Send request to Kafka to get partitions for topic.
*
* @param topics The name of the topics.
* @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
*/
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);

checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
String[] seedBrokers = seedBrokersConfString.split(",");
List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();

final String clientId = "flink-kafka-consumer-partition-lookup";
final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);

Random rnd = new Random();
retryLoop: for (int retry = 0; retry < numRetries; retry++) {
// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
// parallel source instances start. Still, we try all available brokers.
int index = rnd.nextInt(seedBrokers.length);
brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
String seedBroker = seedBrokers[index];
LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
if (++index == seedBrokers.length) {
index = 0;
}

URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);

TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List<TopicMetadata> metaData = resp.topicsMetadata();

// clear in case we have an incomplete list from previous tries
partitions.clear();
for (TopicMetadata item : metaData) {
if (item.errorCode() != ErrorMapping.NoError()) {
// warn and try more brokers
LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
"for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
continue brokersLoop;
}
if (!topics.contains(item.topic())) {
LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
continue brokersLoop;
}
for (PartitionMetadata part : item.partitionsMetadata()) {
Node leader = brokerToNode(part.leader());
KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
partitions.add(pInfo);
}
}
break retryLoop; // leave the loop through the brokers
}
catch (Exception e) {
//validates seed brokers in case of a ClosedChannelException
validateSeedBrokers(seedBrokers, e);
LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
seedBroker, topics, e.getClass().getName(), e.getMessage());
LOG.debug("Detailed trace", e);
// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// sleep shorter.
}
} finally {
if (consumer != null) {
consumer.close();
}
}
} // brokers loop
} // retries loop
return partitions;
}

/**
* Turn a broker instance into a node instance.
* @param broker broker instance
* @return Node representing the given broker
*/
private static Node brokerToNode(Broker broker) {
return new Node(broker.id(), broker.host(), broker.port());
}

/**
* Validate the ZK configuration, checking for required parameters.
*
* @param props Properties to check
*/
protected static void validateZooKeeperConfig(Properties props) {
Expand Down Expand Up @@ -363,36 +252,6 @@ protected static void validateZooKeeperConfig(Properties props) {
}
}

/**
* Validate that at least one seed broker is valid in case of a
* ClosedChannelException.
*
* @param seedBrokers
* array containing the seed brokers e.g. ["host1:port1",
* "host2:port2"]
* @param exception
* instance
*/
private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
if (!(exception instanceof ClosedChannelException)) {
return;
}
int unknownHosts = 0;
for (String broker : seedBrokers) {
URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
try {
InetAddress.getByName(brokerUrl.getHost());
} catch (UnknownHostException e) {
unknownHosts++;
}
}
// throw meaningful exception if all the provided hosts are invalid
if (unknownHosts == seedBrokers.length) {
throw new IllegalArgumentException("All the servers provided in: '"
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
}
}

/**
* Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
* the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
Expand Down
Loading

0 comments on commit 085d4db

Please sign in to comment.