Skip to content

Commit

Permalink
[FLINK-21059][kafka] KafkaSourceEnumerator does not honor consumer pr…
Browse files Browse the repository at this point in the history
…operties
  • Loading branch information
tweise committed Jan 27, 2021
1 parent dac3e72 commit a4f1174
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializer())
* .setStartingOffsetInitializer(OffsetsInitializer.earliest())
* .setStartingOffsets(OffsetsInitializer.earliest())
* .build();
* }</pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private void assignPendingPartitionSplits() {

private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
Properties consumerProps = new Properties();
copyProperty(properties, consumerProps, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
deepCopyProperties(properties, consumerProps);
// set client id prefix
String clientIdPrefix =
consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
Expand All @@ -309,7 +309,7 @@ private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {

private AdminClient getKafkaAdminClient() {
Properties adminClientProps = new Properties();
copyProperty(properties, adminClientProps, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
deepCopyProperties(properties, adminClientProps);
// set client id prefix
String clientIdPrefix =
adminClientProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
Expand Down Expand Up @@ -351,8 +351,11 @@ static int getSplitOwner(TopicPartition tp, int numReaders) {
return (startIndex + tp.partition()) % numReaders;
}

private static void copyProperty(Properties from, Properties to, String key) {
to.setProperty(key, from.getProperty(key));
@VisibleForTesting
static void deepCopyProperties(Properties from, Properties to) {
for (String key : from.stringPropertyNames()) {
to.setProperty(key, from.getProperty(key));
}
}

// --------------- private class ---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.mock.Whitebox;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.AfterClass;
Expand Down Expand Up @@ -267,7 +270,8 @@ public void testWorkWithPreexistingAssignments() throws Throwable {
context2,
ENABLE_PERIODIC_PARTITION_DISCOVERY,
PRE_EXISTING_TOPICS,
preexistingAssignments)) {
preexistingAssignments,
new Properties())) {
enumerator.start();
context2.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);

Expand All @@ -280,6 +284,46 @@ public void testWorkWithPreexistingAssignments() throws Throwable {
}
}

@Test
public void testKafkaClientProperties() {
MockSplitEnumeratorContext<KafkaPartitionSplit> context =
new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
Properties properties = new Properties();
String clientIdPrefix = "test-prefix";
Integer defaultTimeoutMs = 99999;
properties.setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), clientIdPrefix);
properties.setProperty(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(defaultTimeoutMs));
try (KafkaSourceEnumerator enumerator =
createEnumerator(
context,
ENABLE_PERIODIC_PARTITION_DISCOVERY,
PRE_EXISTING_TOPICS,
Collections.emptyMap(),
properties)) {
enumerator.start();

AdminClient adminClient =
(AdminClient) Whitebox.getInternalState(enumerator, "adminClient");
assertNotNull(adminClient);
String clientId = (String) Whitebox.getInternalState(adminClient, "clientId");
assertNotNull(clientId);
assertTrue(clientId.startsWith(clientIdPrefix));
assertEquals(
defaultTimeoutMs, Whitebox.getInternalState(adminClient, "defaultTimeoutMs"));

KafkaConsumer<?, ?> consumer =
(KafkaConsumer) Whitebox.getInternalState(enumerator, "consumer");
assertNotNull(consumer);
clientId = (String) Whitebox.getInternalState(consumer, "clientId");
assertNotNull(clientId);
assertTrue(clientId.startsWith(clientIdPrefix));
assertEquals(
(long) defaultTimeoutMs,
Whitebox.getInternalState(consumer, "requestTimeoutMs"));
}
}

// -------------- some common startup sequence ---------------

private void startEnumeratorAndRegisterReaders(
Expand Down Expand Up @@ -323,7 +367,11 @@ private KafkaSourceEnumerator createEnumerator(
topics.add(DYNAMIC_TOPIC_NAME);
}
return createEnumerator(
enumContext, enablePeriodicPartitionDiscovery, topics, Collections.emptyMap());
enumContext,
enablePeriodicPartitionDiscovery,
topics,
Collections.emptyMap(),
new Properties());
}

/**
Expand All @@ -334,7 +382,8 @@ private KafkaSourceEnumerator createEnumerator(
MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
boolean enablePeriodicPartitionDiscovery,
Collection<String> topicsToSubscribe,
Map<Integer, Set<KafkaPartitionSplit>> currentAssignments) {
Map<Integer, Set<KafkaPartitionSplit>> currentAssignments,
Properties overrideProperties) {
// Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been
// created yet.
StringJoiner topicNameJoiner = new StringJoiner("|");
Expand All @@ -347,6 +396,7 @@ private KafkaSourceEnumerator createEnumerator(

Properties props =
new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class));
KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props);
String partitionDiscoverInterval = enablePeriodicPartitionDiscovery ? "1" : "-1";
props.setProperty(
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
Expand Down

0 comments on commit a4f1174

Please sign in to comment.