Skip to content

Commit

Permalink
[FLINK-4723] [kafka] Unify committed offsets to Kafka to be next reco…
Browse files Browse the repository at this point in the history
…rd to process

This closes apache#2580
  • Loading branch information
tzulitai committed Oct 18, 2016
1 parent 069de27 commit f46ca39
Show file tree
Hide file tree
Showing 18 changed files with 559 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void run() {
@Override
public void run() {
try {
fetcher.commitSpecificOffsetsToKafka(testCommitData);
fetcher.commitInternalOffsetsToKafka(testCommitData);
} catch (Throwable t) {
commitError.set(t);
}
Expand Down Expand Up @@ -255,7 +255,7 @@ public void run() {

// ----- trigger the first offset commit -----

fetcher.commitSpecificOffsetsToKafka(testCommitData1);
fetcher.commitInternalOffsetsToKafka(testCommitData1);
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();

for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
Expand All @@ -272,7 +272,7 @@ else if (partition.topic().equals("another")) {

// ----- trigger the second offset commit -----

fetcher.commitSpecificOffsetsToKafka(testCommitData2);
fetcher.commitInternalOffsetsToKafka(testCommitData2);
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();

for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
Expand All @@ -297,4 +297,4 @@ else if (partition.topic().equals("another")) {
throw new Exception("Exception in the fetcher", caughtError);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ public void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}

// --- offset committing ---

@Test(timeout = 60000)
public void testCommitOffsetsToKafka() throws Exception {
runCommitOffsetsToKafka();
}

@Test(timeout = 60000)
public void testStartFromKafkaCommitOffsets() throws Exception {
runStartFromKafkaCommitOffsets();
}

// TODO: This test will not pass until FLINK-4727 is resolved
// @Test(timeout = 60000)
// public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
// runAutoOffsetRetrievalAndCommitToKafka();
// }

/**
* Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
Expand All @@ -35,6 +34,9 @@
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -117,6 +119,11 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
return stream.addSink(prod);
}

@Override
public KafkaOffsetHandler createOffsetHandler(Properties props) {
return new KafkaOffsetHandlerImpl(props);
}

@Override
public void restartBroker(int leaderId) throws Exception {
brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
Expand Down Expand Up @@ -213,11 +220,11 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("auto.commit.enable", "false");
standardProps.setProperty("enable.auto.commit", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
}

@Override
Expand Down Expand Up @@ -381,4 +388,24 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
}

private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {

private final KafkaConsumer<byte[], byte[]> offsetClient;

public KafkaOffsetHandlerImpl(Properties props) {
offsetClient = new KafkaConsumer<>(props);
}

@Override
public Long getCommittedOffset(String topicName, int partition) {
OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
return (committed != null) ? committed.offset() : null;
}

@Override
public void close() {
offsetClient.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ public void runFetchLoop() throws Exception {
}
}

Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset);
Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setOffset(offset);
Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
if (zkOffset != null) {
// the offset in ZK represents the "next record to process", so we need to subtract it by 1
// to correctly represent our internally checkpointed offsets
partition.setOffset(zkOffset - 1);
}
}
}
Expand Down Expand Up @@ -324,10 +326,11 @@ public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partitio
// ------------------------------------------------------------------------

@Override
public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
zkHandler.writeOffsets(offsets);
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
}

// Set committed offsets in topic partition state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ public void run() {
Thread.sleep(commitInterval);

// create copy a deep copy of the current offsets
HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(partitionStates.length);
HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
currentOffsets.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
}

offsetHandler.writeOffsets(currentOffsets);
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
}
}
catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,30 @@ public ZookeeperOffsetHandler(Properties props) {
// ------------------------------------------------------------------------

/**
* Writes given set of offsets for Kafka partitions to ZooKeeper.
* Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
* the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
* that the committed offsets to Zookeeper represent the next record to process.
*
* @param offsetsToWrite The offsets for the partitions to write.
* @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
* @throws Exception The method forwards exceptions.
*/
public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) {
public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
KafkaTopicPartition tp = entry.getKey();
long offset = entry.getValue();

if (offset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
Long lastProcessedOffset = entry.getValue();
if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
}
}
}

/**
*
* @param partitions The partitions to read offsets for.
* @return The mapping from partition to offset.
* @throws Exception This method forwards exceptions.
*/
public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
for (KafkaTopicPartition tp : partitions) {
Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
Expand Down
Loading

0 comments on commit f46ca39

Please sign in to comment.