Skip to content

Commit

Permalink
MINOR: Improve EOS example exception handling (apache#8052)
Browse files Browse the repository at this point in the history
The current EOS example mixes fatal and non-fatal error handling. This patch fixes this problem and simplifies the example.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
Boyang Chen authored Feb 20, 2020
1 parent 8ab0994 commit 776565f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 101 deletions.
12 changes: 5 additions & 7 deletions examples/README
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ To run the demo:
2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh standaloneMode 6 3 50000`,
this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records
6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh groupMode 6 3 50000`,
this means the same as the standalone demo, except consumers are using subscription mode.
7. Some notes for exactly once demo:
7.1. The Kafka server has to be on broker version 2.5 or higher to be able to run group mode.
7.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments"
5. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records.
6. Some notes for exactly once demo:
6.1. The Kafka server has to be on broker version 2.5 or higher.
6.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments"
3 changes: 3 additions & 0 deletions examples/src/main/java/kafka/examples/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

Expand All @@ -37,6 +38,7 @@ public class Consumer extends ShutdownableThread {

public Consumer(final String topic,
final String groupId,
final Optional<String> instanceId,
final boolean readCommitted,
final int numMessageToConsume,
final CountDownLatch latch) {
Expand All @@ -45,6 +47,7 @@ public Consumer(final String topic,
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
Expand Down
121 changes: 48 additions & 73 deletions examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.examples;

import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -34,8 +33,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -47,42 +46,32 @@ public class ExactlyOnceMessageProcessor extends Thread {

private static final boolean READ_COMMITTED = true;

private final String mode;
private final String inputTopic;
private final String outputTopic;
private final String consumerGroupId;
private final int numPartitions;
private final int numInstances;
private final int instanceIdx;
private final String transactionalId;
private final String groupInstanceId;

private final KafkaProducer<Integer, String> producer;
private final KafkaConsumer<Integer, String> consumer;

private final CountDownLatch latch;

public ExactlyOnceMessageProcessor(final String mode,
final String inputTopic,
public ExactlyOnceMessageProcessor(final String inputTopic,
final String outputTopic,
final int numPartitions,
final int numInstances,
final int instanceIdx,
final CountDownLatch latch) {
this.mode = mode;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.consumerGroupId = "Eos-consumer";
this.numPartitions = numPartitions;
this.numInstances = numInstances;
this.instanceIdx = instanceIdx;
this.transactionalId = "Processor-" + instanceIdx;
// If we are using the group mode, it is recommended to have a relatively short txn timeout
// in order to clear pending offsets faster.
final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000 : -1;
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get();
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
consumer = new Consumer(inputTopic, "Eos-consumer",
Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get();
this.latch = latch;
}

Expand All @@ -93,78 +82,56 @@ public void run() {

final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);

// Under group mode, topic based subscription is sufficient as EOS apps are safe to cooperate transactionally after 2.5.
// Under standalone mode, user needs to manually assign the topic partitions and make sure the assignment is unique
// across the consumer group instances.
if (this.mode.equals("groupMode")) {
consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printWithTxnId("Received partition assignment after rebalancing: " + partitions);
messageRemaining.set(messagesRemaining(consumer));
}
});
} else {
// Do a range assignment of topic partitions.
List<TopicPartition> topicPartitions = new ArrayList<>();
int rangeSize = numPartitions / numInstances;
int startPartition = rangeSize * instanceIdx;
int endPartition = Math.min(numPartitions - 1, startPartition + rangeSize - 1);
for (int partition = startPartition; partition <= endPartition; partition++) {
topicPartitions.add(new TopicPartition(inputTopic, partition));
consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
}

consumer.assign(topicPartitions);
printWithTxnId("Manually assign partitions: " + topicPartitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printWithTxnId("Received partition assignment after rebalancing: " + partitions);
messageRemaining.set(messagesRemaining(consumer));
}
});

int messageProcessed = 0;
boolean abortPreviousTransaction = false;
while (messageRemaining.get() > 0) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
try {
// Abort previous transaction if instructed.
if (abortPreviousTransaction) {
producer.abortTransaction();
// The consumer fetch position also needs to be reset.
resetToLastCommittedPositions(consumer);
abortPreviousTransaction = false;
}
try {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
// Begin a new transaction session.
producer.beginTransaction();
for (ConsumerRecord<Integer, String> record : records) {
// Process the record and send to downstream.
ProducerRecord<Integer, String> customizedRecord = transform(record);
producer.send(customizedRecord);
}
Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
}

Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();

// Checkpoint the progress by sending offsets to group coordinator broker.
// Under group mode, we must apply consumer group metadata for proper fencing.
if (this.mode.equals("groupMode")) {
producer.sendOffsetsToTransaction(positions, consumer.groupMetadata());
} else {
producer.sendOffsetsToTransaction(positions, consumerGroupId);
}
// Note that this API is only available for broker >= 2.5.
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

// Finish the transaction. All sent records should be visible for consumption now.
producer.commitTransaction();
messageProcessed += records.count();
} catch (CommitFailedException e) {
// In case of a retriable exception, suggest aborting the ongoing transaction for correctness.
abortPreviousTransaction = true;
} catch (ProducerFencedException | FencedInstanceIdException e) {
throw new KafkaException("Encountered fatal error during processing: " + e.getMessage());
}
} catch (ProducerFencedException e) {
throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
} catch (FencedInstanceIdException e) {
throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
} catch (KafkaException e) {
// If we have not been fenced, try to abort the transaction and continue. This will raise immediately
// if the producer has hit a fatal error.
producer.abortTransaction();

// The consumer fetch position needs to be restored to the committed offset
// before the transaction started.
resetToLastCommittedPositions(consumer);
}

messageRemaining.set(messagesRemaining(consumer));
printWithTxnId("Message remaining: " + messageRemaining);
}
Expand All @@ -173,6 +140,14 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
latch.countDown();
}

private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
}
return offsets;
}

private void printWithTxnId(final String message) {
System.out.println(transactionalId + ": " + message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.errors.TimeoutException;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -28,7 +29,7 @@ public static void main(String[] args) throws InterruptedException {
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
producerThread.start();

Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch);
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
consumerThread.start();

if (!latch.await(5, TimeUnit.MINUTES)) {
Expand Down
32 changes: 17 additions & 15 deletions examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* This exactly once demo driver takes 4 arguments:
* - mode: whether to run as standalone app, or a group
* This exactly once demo driver takes 3 arguments:
* - partition: number of partitions for input/output topic
* - instances: number of instances
* - records: number of records
* An example argument list would be `groupMode 6 3 50000`
* An example argument list would be `6 3 50000`.
*
* If you are using Intellij, the above arguments should be put in the configuration's `Program Arguments`.
* Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console
* output to file` to record all the log output together.
*
* The driver could be decomposed as following stages:
*
Expand All @@ -60,10 +64,10 @@
* The driver will block for the consumption of all committed records.
*
* From this demo, you could see that all the records from pre-population are processed exactly once,
* in either standalone mode or group mode, with strong partition level ordering guarantee.
* with strong partition level ordering guarantee.
*
* Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
* in order to run group mode, otherwise the app could throw
* in order to run, otherwise the app could throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*/
public class KafkaExactlyOnceDemo {
Expand All @@ -72,15 +76,14 @@ public class KafkaExactlyOnceDemo {
private static final String OUTPUT_TOPIC = "output-topic";

public static void main(String[] args) throws InterruptedException, ExecutionException {
if (args.length != 4) {
throw new IllegalArgumentException("Should accept 4 parameters: [mode], " +
if (args.length != 3) {
throw new IllegalArgumentException("Should accept 3 parameters: " +
"[number of partitions], [number of instances], [number of records]");
}

String mode = args[0];
int numPartitions = Integer.parseInt(args[1]);
int numInstances = Integer.parseInt(args[2]);
int numRecords = Integer.parseInt(args[3]);
int numPartitions = Integer.parseInt(args[0]);
int numInstances = Integer.parseInt(args[1]);
int numRecords = Integer.parseInt(args[2]);

/* Stage 1: topic cleanup and recreation */
recreateTopics(numPartitions);
Expand All @@ -99,9 +102,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc

/* Stage 3: transactionally process all messages */
for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(mode,
INPUT_TOPIC, OUTPUT_TOPIC, numPartitions,
numInstances, instanceIdx, transactionalCopyLatch);
ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(
INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch);
messageProcessor.start();
}

Expand All @@ -112,7 +114,7 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
CountDownLatch consumeLatch = new CountDownLatch(1);

/* Stage 4: consume all processed messages to verify exactly once */
Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch);
Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
consumerThread.start();

if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
Expand Down
5 changes: 0 additions & 5 deletions examples/src/main/java/kafka/examples/KafkaProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";

private KafkaProperties() {}
}

0 comments on commit 776565f

Please sign in to comment.