Skip to content

Commit

Permalink
[FLINK-29480][kafka] Skip null ProduceRecord when writing out in Kafk…
Browse files Browse the repository at this point in the history
…aWriter

This closes apache#21186.

Co-authored-by: Leonard Xu <[email protected]>
  • Loading branch information
salvalcantara and leonardBang authored Oct 31, 2022
1 parent 79712e1 commit 70825e2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
Expand Down Expand Up @@ -54,8 +56,9 @@ default void open(
* @param element element to be serialized
* @param context context to possibly determine target partition
* @param timestamp timestamp
* @return Kafka {@link ProducerRecord}
* @return Kafka {@link ProducerRecord} or null if the given element cannot be serialized
*/
@Nullable
ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp);

/** Context providing information of the kafka record target location. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ class KafkaWriter<IN>
}

@Override
public void write(IN element, Context context) throws IOException {
public void write(@Nullable IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsOutCounter.inc();
if (record != null) {
currentProducer.send(record, deliveryCallback);
numRecordsOutCounter.inc();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception {
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);

// elements for which the serializer returns null should be silently skipped
writer.write(null, SINK_WRITER_CONTEXT);
timeService.trigger();
assertThat(numBytesOut.getCount()).isEqualTo(0L);
assertThat(numRecordsOut.getCount()).isEqualTo(0);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);

// but elements for which a non-null producer record is returned should count
writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
assertThat(numRecordsOut.getCount()).isEqualTo(1);
Expand Down Expand Up @@ -491,6 +500,10 @@ private class DummyRecordSerializer implements KafkaRecordSerializationSchema<In
@Override
public ProducerRecord<byte[], byte[]> serialize(
Integer element, KafkaSinkContext context, Long timestamp) {
if (element == null) {
// in general, serializers should be allowed to skip invalid elements
return null;
}
return new ProducerRecord<>(topic, ByteBuffer.allocate(4).putInt(element).array());
}
}
Expand Down

0 comments on commit 70825e2

Please sign in to comment.