diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 8c1cb2740d656..e1c4ec7347f9b 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -61,7 +61,7 @@ * .setGroupId("MyGroup") * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) * .setDeserializer(new TestingKafkaRecordDeserializer()) - * .setStartingOffsetInitializer(OffsetsInitializer.earliest()) + * .setStartingOffsets(OffsetsInitializer.earliest()) * .build(); * } * diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 35cfd364d42ab..bc34630b0d90e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -290,7 +290,7 @@ private void assignPendingPartitionSplits() { private KafkaConsumer getKafkaConsumer() { Properties consumerProps = new Properties(); - copyProperty(properties, consumerProps, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + deepCopyProperties(properties, consumerProps); // set client id prefix String clientIdPrefix = consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()); @@ -309,7 +309,7 @@ private KafkaConsumer getKafkaConsumer() { private AdminClient getKafkaAdminClient() { Properties adminClientProps = new Properties(); - copyProperty(properties, adminClientProps, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + deepCopyProperties(properties, adminClientProps); // set client id prefix String clientIdPrefix = adminClientProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()); @@ -351,8 +351,11 @@ static int getSplitOwner(TopicPartition tp, int numReaders) { return (startIndex + tp.partition()) % numReaders; } - private static void copyProperty(Properties from, Properties to, String key) { - to.setProperty(key, from.getProperty(key)); + @VisibleForTesting + static void deepCopyProperties(Properties from, Properties to) { + for (String key : from.stringPropertyNames()) { + to.setProperty(key, from.getProperty(key)); + } } // --------------- private class --------------- diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index f1d10597ff32a..c951f0bfd7436 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -26,9 +26,12 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.mock.Whitebox; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.AfterClass; @@ -267,7 +270,8 @@ public void testWorkWithPreexistingAssignments() throws Throwable { context2, ENABLE_PERIODIC_PARTITION_DISCOVERY, PRE_EXISTING_TOPICS, - preexistingAssignments)) { + preexistingAssignments, + new Properties())) { enumerator.start(); context2.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX); @@ -280,6 +284,46 @@ public void testWorkWithPreexistingAssignments() throws Throwable { } } + @Test + public void testKafkaClientProperties() { + MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + Properties properties = new Properties(); + String clientIdPrefix = "test-prefix"; + Integer defaultTimeoutMs = 99999; + properties.setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), clientIdPrefix); + properties.setProperty( + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(defaultTimeoutMs)); + try (KafkaSourceEnumerator enumerator = + createEnumerator( + context, + ENABLE_PERIODIC_PARTITION_DISCOVERY, + PRE_EXISTING_TOPICS, + Collections.emptyMap(), + properties)) { + enumerator.start(); + + AdminClient adminClient = + (AdminClient) Whitebox.getInternalState(enumerator, "adminClient"); + assertNotNull(adminClient); + String clientId = (String) Whitebox.getInternalState(adminClient, "clientId"); + assertNotNull(clientId); + assertTrue(clientId.startsWith(clientIdPrefix)); + assertEquals( + defaultTimeoutMs, Whitebox.getInternalState(adminClient, "defaultTimeoutMs")); + + KafkaConsumer consumer = + (KafkaConsumer) Whitebox.getInternalState(enumerator, "consumer"); + assertNotNull(consumer); + clientId = (String) Whitebox.getInternalState(consumer, "clientId"); + assertNotNull(clientId); + assertTrue(clientId.startsWith(clientIdPrefix)); + assertEquals( + (long) defaultTimeoutMs, + Whitebox.getInternalState(consumer, "requestTimeoutMs")); + } + } + // -------------- some common startup sequence --------------- private void startEnumeratorAndRegisterReaders( @@ -323,7 +367,11 @@ private KafkaSourceEnumerator createEnumerator( topics.add(DYNAMIC_TOPIC_NAME); } return createEnumerator( - enumContext, enablePeriodicPartitionDiscovery, topics, Collections.emptyMap()); + enumContext, + enablePeriodicPartitionDiscovery, + topics, + Collections.emptyMap(), + new Properties()); } /** @@ -334,7 +382,8 @@ private KafkaSourceEnumerator createEnumerator( MockSplitEnumeratorContext enumContext, boolean enablePeriodicPartitionDiscovery, Collection topicsToSubscribe, - Map> currentAssignments) { + Map> currentAssignments, + Properties overrideProperties) { // Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been // created yet. StringJoiner topicNameJoiner = new StringJoiner("|"); @@ -347,6 +396,7 @@ private KafkaSourceEnumerator createEnumerator( Properties props = new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class)); + KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props); String partitionDiscoverInterval = enablePeriodicPartitionDiscovery ? "1" : "-1"; props.setProperty( KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),