Skip to content

Commit

Permalink
Showing 3 changed files with 6 additions and 5 deletions.
6 changes: 3 additions & 3 deletions docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
@@ -542,14 +542,14 @@ methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` approp
instead of catching and rethrowing them. This essentially accounts the record
to have succeeded, even if it was never written to the target Kafka topic. This
must be disabled for at-least-once.
* `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
* `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
With this enabled, Flink's checkpoints will wait for any
on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
succeeding the checkpoint. This ensures that all records before the checkpoint have
been written to Kafka. This must be enabled for at-least-once.

In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
0.9 and 0.10, `setLogFailureOnly` must be set to `false` and `setFlushOnCheckpoint` must be set
In conclusion, the Kafka producer by default has at-least-once guarantees for versions
0.9 and 0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
to `true`.

**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
Original file line number Diff line number Diff line change
@@ -112,7 +112,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
/**
* If true, the producer will wait until all outstanding records have been send to the broker.
*/
protected boolean flushOnCheckpoint;
protected boolean flushOnCheckpoint = true;

// -------------------------------- Runtime fields ------------------------------------------

Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -96,7 +97,7 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);

RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
RuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);

0 comments on commit 93176e5

Please sign in to comment.