Skip to content

Commit

Permalink
[BEAM-7143] adding withConsumerConfigUpdates
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed Apr 25, 2019
1 parent e75f46f commit 477a2a0
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private SparkPipelineResult run(Optional<Instant> stopWatermarkOption, int expec
.withTopics(Collections.singletonList(TOPIC))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(InstantDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
.withTimestampFn(KV::getValue)
.withWatermarkFn(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public PCollection<Row> buildIOReader(PBegin begin) {
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopics(topics)
.updateConsumerProperties(configUpdates)
.withConsumerConfigUpdates(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else if (topicPartitions != null) {
kafkaRead =
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
.withTopicPartitions(topicPartitions)
.updateConsumerProperties(configUpdates)
.withConsumerConfigUpdates(configUpdates)
.withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
.withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
*
* // you can further customize KafkaConsumer used to read the records by adding more
* // settings for ConsumerConfig. e.g :
* .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
* .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
*
* // set event times and watermark based on 'LogAppendTime'. To provide a custom
* // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
Expand Down Expand Up @@ -157,7 +157,7 @@
* <p>When the pipeline starts for the first time, or without any checkpoint, the source starts
* consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
* beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through {@link
* Read#updateConsumerProperties(Map)}. You can also enable offset auto_commit in Kafka to resume
* Read#withConsumerConfigUpdates(Map)}. You can also enable offset auto_commit in Kafka to resume
* from last committed.
*
* <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>
Expand Down Expand Up @@ -515,7 +515,7 @@ public void setValueDeserializer(byte[] valueDeserializer) {

/** Sets the bootstrap servers for the Kafka consumer. */
public Read<K, V> withBootstrapServers(String bootstrapServers) {
return updateConsumerProperties(
return withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}

Expand Down Expand Up @@ -610,7 +610,12 @@ public Read<K, V> withConsumerFactoryFn(
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
}

/** Update consumer configuration with new properties. */
/**
* Update consumer configuration with new properties.
*
* @deprecated as of version 2.13. Use {@link #withConsumerConfigUpdates(Map)} instead
*/
@Deprecated
public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
Map<String, Object> config =
updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
Expand Down Expand Up @@ -771,7 +776,7 @@ public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> waterm
* read committed messages. See JavaDoc for {@link KafkaConsumer} for more description.
*/
public Read<K, V> withReadCommitted() {
return updateConsumerProperties(ImmutableMap.of("isolation.level", "read_committed"));
return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
}

/**
Expand Down Expand Up @@ -804,6 +809,24 @@ public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetCo
return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
}

/**
* Update configuration for the backend main consumer. Note that the default consumer properties
* will not be completely overridden. This method only updates the value which has the same key.
*
* <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
* 1. the main consumer, which reads data from kafka;<br>
* 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
* offset;<br>
*
* <p>By default, main consumer uses the configuration from {@link
* #DEFAULT_CONSUMER_PROPERTIES}.
*/
public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
Map<String, Object> config =
updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);
return toBuilder().setConsumerConfig(config).build();
}

/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void testUnreachableKafkaBrokers() {
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.updateConsumerProperties(
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
Expand Down Expand Up @@ -589,7 +589,7 @@ public void testUnboundedSourceCreateTimestamps() {
p.apply(
mkKafkaReadTransform(numElements, null)
.withCreateTime(Duration.ZERO)
.updateConsumerProperties(
.withConsumerConfigUpdates(
ImmutableMap.of(
TIMESTAMP_TYPE_CONFIG,
"CreateTime",
Expand Down Expand Up @@ -670,7 +670,7 @@ public void testUnboundedSourceWithExceptionInKafkaFetch() {
new ConsumerFactoryFn(
ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
.withMaxNumRecords(2 * numElements) // Try to read more messages than available.
.updateConsumerProperties(ImmutableMap.of("inject.error.at.eof", true))
.withConsumerConfigUpdates(ImmutableMap.of("inject.error.at.eof", true))
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class);

Expand Down Expand Up @@ -912,7 +912,8 @@ public void testUnboundedSourceMetrics() {
p.apply(
readStep,
mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.updateConsumerProperties(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test.group"))
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test.group"))
.commitOffsetsInFinalize()
.withoutMetadata());

Expand Down

0 comments on commit 477a2a0

Please sign in to comment.