Skip to content

Commit

Permalink
[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction awar…
Browse files Browse the repository at this point in the history
…e of transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.

This closes apache#4910.
  • Loading branch information
GJL authored and tzulitai committed Nov 2, 2017
1 parent 7fb7e0b commit 8cdf2ff
Show file tree
Hide file tree
Showing 5 changed files with 589 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
Expand Down Expand Up @@ -49,6 +50,7 @@

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -91,6 +93,7 @@
* will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link Semantic#EXACTLY_ONCE} please refer to Flink's
* Kafka connector documentation.
*/
@PublicEvolving
public class FlinkKafkaProducer011<IN>
extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> {

Expand Down Expand Up @@ -446,13 +449,31 @@ public FlinkKafkaProducer011(
throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
}

if (!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer");
this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
LOG.warn("Property [%s] not specified. Setting it to %s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
}

// Enable transactionTimeoutWarnings to avoid silent data loss
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
if (semantic == Semantic.EXACTLY_ONCE) {
final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long transactionTimeout;
if (object instanceof String && StringUtils.isNumeric((String) object)) {
transactionTimeout = Long.parseLong((String) object);
} else if (object instanceof Number) {
transactionTimeout = ((Number) object).longValue();
} else {
throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+ " must be numeric, was " + object);
}
super.setTransactionTimeout(transactionTimeout);
super.enableTransactionTimeoutWarnings(0.8);
}

this.topicPartitionsMap = new HashMap<>();
}

Expand Down Expand Up @@ -480,6 +501,22 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}

/**
* Disables the propagation of exceptions thrown when committing presumably timed out Kafka
* transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
* never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions
* will still be logged to inform the user that data loss might have occurred.
*
* <p>Note that we use {@link System#currentTimeMillis()} to track the age of a transaction.
* Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will
* attempt at least one commit of the transaction before giving up.</p>
*/
@Override
public FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout() {
super.ignoreFailuresAfterTransactionTimeout();
return this;
}

// ----------------------------------- Utilities --------------------------

/**
Expand Down Expand Up @@ -556,6 +593,7 @@ record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, seria

@Override
public void close() throws Exception {
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending records
flush(currentTransaction);
Expand Down Expand Up @@ -588,6 +626,7 @@ protected KafkaTransactionState beginTransaction() throws Exception {
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new KafkaTransactionState(currentTransaction.producer);
}
Expand All @@ -603,7 +642,7 @@ private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
throw new Exception(
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins.");
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}
producer = initTransactionalProducer(transactionalId, true);
producer.initTransactions();
Expand Down Expand Up @@ -645,10 +684,9 @@ protected void commit(KafkaTransactionState transaction) {
protected void recoverAndCommit(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
KafkaTransactionState kafkaTransaction = transaction;
FlinkKafkaProducer<byte[], byte[]> producer =
initTransactionalProducer(kafkaTransaction.transactionalId, false);
producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
try {
producer.commitTransaction();
producer.close();
Expand Down Expand Up @@ -857,6 +895,7 @@ private String generateTransactionalId(long transactionalId) {
}

int getTransactionCoordinatorId() {
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction == null || currentTransaction.producer == null) {
throw new IllegalArgumentException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder;

import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -59,42 +60,43 @@ protected int getLength() {
protected TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>[] getTestData() {
//noinspection unchecked
return new TwoPhaseCommitSinkFunction.State[] {
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
Collections.emptyList(),
Optional.empty()),
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 2711),
Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 42)),
Optional.empty()),
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
Collections.emptyList(),
Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))),
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
Collections.emptyList(),
Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello")))),
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0)),
Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.emptySet()))),
new TwoPhaseCommitSinkFunction.State<
FlinkKafkaProducer011.KafkaTransactionState,
FlinkKafkaProducer011.KafkaTransactionContext>(
new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null),
Collections.singletonList(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null)),
new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer011.KafkaTransactionState("fake", 1L, (short) 42, null), 0)),
Optional.of(new FlinkKafkaProducer011.KafkaTransactionContext(Collections.singleton("hello"))))
};
}
Expand Down
Loading

0 comments on commit 8cdf2ff

Please sign in to comment.