KafkaIO provides unbounded source and sink for Kafka topics. Kafka versions 0.9 and above are supported.
- Read from a topic with 8 byte long keys and string values:
PCollection<KV<Long, String>> kafkaRecords =
pipeline
.apply(KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a"))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withoutMetadata()
);
- Write the same PCollection to a Kafka topic:
kafkaRecords.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
);
Please see JavaDoc for KafkaIO in KafkaIO.java for complete documentation and a more descriptive usage example.
- 0.2.0 : Assign one split for each of the Kafka topic partitions. This makes Dataflow Update from previous version incompatible.
- 0.1.0 : KafkaIO with support for Unbounded Source and Sink.