Skip to content

Commit

Permalink
[BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
Browse files Browse the repository at this point in the history
  • Loading branch information
xumingmin authored and aromanenko-dev committed Jan 30, 2019
1 parent 93821dd commit 7f17201
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ public abstract static class Read<K, V>

abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

@Nullable
abstract Map<String, Object> getOffsetConsumerConfig();

abstract Builder<K, V> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -380,6 +383,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);

abstract Read<K, V> build();
}

Expand Down Expand Up @@ -656,6 +661,24 @@ public Read<K, V> commitOffsetsInFinalize() {
return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
}

/**
* Set additional configuration for the backend offset consumer. It may be required for a
* secured Kafka cluster, especially when you see similar WARN log message 'exception while
* fetching latest offset for partition {}. will be retried'.
*
* <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
* 1. the main consumer, which reads data from kafka;<br>
* 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
* offset;<br>
*
* <p>By default, offset consumer inherits the configuration from main consumer, with an
* auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
* which requires more configurations.
*/
public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
}

/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,7 @@ public boolean start() throws IOException {
consumerPollThread.submit(this::consumerPollLoop);

// offsetConsumer setup :

Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
// override group_id and disable auto_commit so that it does not interfere with main consumer
String offsetGroupId =
String.format(
"%s_offset_consumer_%d_%s",
name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
Map<String, Object> offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig());
offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
// fetches latest offset for two reasons : (a) to calculate backlog (number of records
// yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
// for (a) is to leave this config unchanged from the main config (i.e. if there are records
// that can't be read because of uncommitted records before them, they shouldn't
// ideally count towards backlog when "read_committed" is enabled. But (b)
// requires finding out if there are any records left to be read (committed or uncommitted).
// Rather than using two separate consumers we will go with better support for (b). If we do
// hit a case where a lot of records are not readable (due to some stuck transactions), the
// pipeline would report more backlog, but would not be able to consume it. It might be ok
// since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig();

offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
Expand Down Expand Up @@ -726,6 +705,39 @@ private long getSplitBacklogMessageCount() {
return backlogCount;
}

@VisibleForTesting
Map<String, Object> getOffsetConsumerConfig() {
Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig());
offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
// override group_id and disable auto_commit so that it does not interfere with main consumer
String offsetGroupId =
String.format(
"%s_offset_consumer_%d_%s",
name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);

if (source.getSpec().getOffsetConsumerConfig() != null) {
offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig());
}

// Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
// fetches latest offset for two reasons : (a) to calculate backlog (number of records
// yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
// for (a) is to leave this config unchanged from the main config (i.e. if there are records
// that can't be read because of uncommitted records before them, they shouldn't
// ideally count towards backlog when "read_committed" is enabled. But (b)
// requires finding out if there are any records left to be read (committed or uncommitted).
// Rather than using two separate consumers we will go with better support for (b). If we do
// hit a case where a lot of records are not readable (due to some stuck transactions), the
// pipeline would report more backlog, but would not be able to consume it. It might be ok
// since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");

return offsetConsumerConfig;
}

@Override
public void close() throws IOException {
closed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,49 @@ public void testSinkMetrics() throws Exception {
}
}

@Test
public void testOffsetConsumerConfigOverrides() throws Exception {
KafkaUnboundedReader reader1 =
new KafkaUnboundedReader(
new KafkaUnboundedSource(
KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withOffsetConsumerConfigOverrides(null),
0),
null);
assertTrue(
reader1
.getOffsetConsumerConfig()
.get(ConsumerConfig.GROUP_ID_CONFIG)
.toString()
.matches(".*_offset_consumer_\\d+_none"));
assertEquals(
false, reader1.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
assertEquals(
"read_uncommitted",
reader1.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));

String offsetGroupId = "group.offsetConsumer";
KafkaUnboundedReader reader2 =
new KafkaUnboundedReader(
new KafkaUnboundedSource(
KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withOffsetConsumerConfigOverrides(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId)),
0),
null);
assertEquals(
offsetGroupId, reader2.getOffsetConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG));
assertEquals(
false, reader2.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
assertEquals(
"read_uncommitted",
reader2.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
}

private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
Expand Down

0 comments on commit 7f17201

Please sign in to comment.