From a010166f9e67587a065fbe50657a9c76fb7403a2 Mon Sep 17 00:00:00 2001 From: penghui Date: Fri, 28 Dec 2018 13:43:12 +0800 Subject: [PATCH] Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256) ### Motivation Fixes #3255 Easier to consume multiple topics by PulsarConsumerSource. ### Modifications Add multi-topic and pattern-topic set for PulsarConsumerSource. ### Result UT passed. --- .../pulsar/PulsarConsumerSource.java | 25 ++++-- .../pulsar/PulsarSourceBuilder.java | 79 +++++++++++++++++-- .../pulsar/PulsarSourceBuilderTest.java | 30 ++++++- 3 files changed, 118 insertions(+), 16 deletions(-) diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java index 84e0e50664c93..6479bf0b1a401 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.IOUtils; @@ -41,6 +42,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; /** * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. @@ -55,7 +57,8 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase topicNames; + private final Pattern topicsPattern; private final String subscriptionName; private final DeserializationSchema deserializer; @@ -72,7 +75,8 @@ class PulsarConsumerSource extends MessageAcknowledgingSourceBase builder) { super(MessageId.class); this.serviceUrl = builder.serviceUrl; - this.topic = builder.topic; + this.topicNames = builder.topicNames; + this.topicsPattern = builder.topicsPattern; this.deserializer = builder.deserializationSchema; this.subscriptionName = builder.subscriptionName; this.acknowledgementBatchSize = builder.acknowledgementBatchSize; @@ -191,10 +195,17 @@ PulsarClient createClient() throws PulsarClientException { } Consumer createConsumer(PulsarClient client) throws PulsarClientException { - return client.newConsumer() - .topic(topic) - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Failover) - .subscribe(); + if (topicsPattern != null) { + return client.newConsumer().topicsPattern(topicsPattern) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + } else { + return client.newConsumer() + .topics(Lists.newArrayList(topicNames)) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + } } } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java index 9605f079a3516..7d06806d60f92 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java @@ -24,6 +24,12 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Preconditions; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Pattern; + /** * A class for building a pulsar source. */ @@ -36,7 +42,8 @@ public class PulsarSourceBuilder { final DeserializationSchema deserializationSchema; String serviceUrl = SERVICE_URL; - String topic; + final Set topicNames = new TreeSet<>(); + Pattern topicsPattern; String subscriptionName = "flink-sub"; long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE; @@ -57,18 +64,74 @@ public PulsarSourceBuilder serviceUrl(String serviceUrl) { } /** - * Sets the topic to consumer from. This is required. + * Sets topics to consumer from. This is required. + * + *

Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics) + * are in the following format: + * {persistent|non-persistent}://tenant/namespace/topic + * + * @param topics the topic to consumer from + * @return this builder + */ + public PulsarSourceBuilder topic(String... topics) { + Preconditions.checkArgument(topics != null && topics.length > 0, + "topics cannot be blank"); + for (String topic : topics) { + Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic"); + } + this.topicNames.addAll(Arrays.asList(topics)); + return this; + } + + /** + * Sets topics to consumer from. This is required. + * + *

Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics) + * are in the following format: + * {persistent|non-persistent}://tenant/namespace/topic + * + * @param topics the topic to consumer from + * @return this builder + */ + public PulsarSourceBuilder topics(List topics) { + Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank"); + topics.forEach(topicName -> + Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic")); + this.topicNames.addAll(topics); + return this; + } + + /** + * Use topic pattern to config sets of topics to consumer + * + *

Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics) + * are in the following format: + * {persistent|non-persistent}://tenant/namespace/topic + * + * @param topicsPattern topic pattern to consumer from + * @return this builder + */ + public PulsarSourceBuilder topicsPattern(Pattern topicsPattern) { + Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null"); + Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set."); + this.topicsPattern = topicsPattern; + return this; + } + + /** + * Use topic pattern to config sets of topics to consumer * *

Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics) * are in the following format: * {persistent|non-persistent}://tenant/namespace/topic * - * @param topic the topic to consumer from + * @param topicsPattern topic pattern string to consumer from * @return this builder */ - public PulsarSourceBuilder topic(String topic) { - Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); - this.topic = topic; + public PulsarSourceBuilder topicsPatternString(String topicsPattern) { + Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank"); + Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set."); + this.topicsPattern = Pattern.compile(topicsPattern); return this; } @@ -101,9 +164,9 @@ public PulsarSourceBuilder acknowledgementBatchSize(long size) { public SourceFunction build() { Preconditions.checkNotNull(serviceUrl, "a service url is required"); - Preconditions.checkNotNull(topic, "a topic is required"); + Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null, + "At least one topic or topics pattern is required"); Preconditions.checkNotNull(subscriptionName, "a subscription name is required"); - return new PulsarConsumerSource<>(this); } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java index 5a916e88099df..b6b159d99e396 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java @@ -26,6 +26,8 @@ import org.junit.Test; import java.io.IOException; +import java.util.Arrays; +import java.util.regex.Pattern; /** * Tests for PulsarSourceBuilder @@ -49,7 +51,7 @@ public void testBuild() { Assert.assertNotNull(sourceFunction); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testBuildWithoutSettingRequiredProperties() { pulsarSourceBuilder.build(); } @@ -74,6 +76,32 @@ public void testTopicWithBlank() { pulsarSourceBuilder.topic(" "); } + @Test(expected = IllegalArgumentException.class) + public void testTopicsWithNull() { + pulsarSourceBuilder.topics(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testTopicsWithBlank() { + pulsarSourceBuilder.topics(Arrays.asList(" ", " ")); + } + + @Test(expected = IllegalArgumentException.class) + public void testTopicPatternWithNull() { + pulsarSourceBuilder.topicsPattern(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testTopicPatternAlreadySet() { + pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*")); + pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*")); + } + + @Test(expected = IllegalArgumentException.class) + public void testTopicPattenStringWithNull() { + pulsarSourceBuilder.topicsPatternString(null); + } + @Test(expected = IllegalArgumentException.class) public void testSubscriptionNameWithNull() { pulsarSourceBuilder.subscriptionName(null);