Skip to content

Commit

Permalink
Support multi-topic and pattern-topic for PulsarConsumerSource in pul…
Browse files Browse the repository at this point in the history
…sar (apache#3256)

### Motivation

Fixes apache#3255 

Easier to consume multiple topics by PulsarConsumerSource.

### Modifications

Add multi-topic and pattern-topic set for PulsarConsumerSource.

### Result

UT passed.
  • Loading branch information
codelipenghui authored and sijie committed Dec 28, 2018
1 parent 209c448 commit a010166
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.IOUtils;
Expand All @@ -41,6 +42,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
* Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
Expand All @@ -55,7 +57,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI

private final int messageReceiveTimeoutMs = 100;
private final String serviceUrl;
private final String topic;
private final Set<String> topicNames;
private final Pattern topicsPattern;
private final String subscriptionName;
private final DeserializationSchema<T> deserializer;

Expand All @@ -72,7 +75,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
super(MessageId.class);
this.serviceUrl = builder.serviceUrl;
this.topic = builder.topic;
this.topicNames = builder.topicNames;
this.topicsPattern = builder.topicsPattern;
this.deserializer = builder.deserializationSchema;
this.subscriptionName = builder.subscriptionName;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
Expand Down Expand Up @@ -191,10 +195,17 @@ PulsarClient createClient() throws PulsarClientException {
}

Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
return client.newConsumer()
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
if (topicsPattern != null) {
return client.newConsumer().topicsPattern(topicsPattern)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
} else {
return client.newConsumer()
.topics(Lists.newArrayList(topicNames))
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;

/**
* A class for building a pulsar source.
*/
Expand All @@ -36,7 +42,8 @@ public class PulsarSourceBuilder<T> {

final DeserializationSchema<T> deserializationSchema;
String serviceUrl = SERVICE_URL;
String topic;
final Set<String> topicNames = new TreeSet<>();
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;

Expand All @@ -57,18 +64,74 @@ public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
}

/**
* Sets the topic to consumer from. This is required.
* Sets topics to consumer from. This is required.
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
*
* @param topics the topic to consumer from
* @return this builder
*/
public PulsarSourceBuilder<T> topic(String... topics) {
Preconditions.checkArgument(topics != null && topics.length > 0,
"topics cannot be blank");
for (String topic : topics) {
Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
}
this.topicNames.addAll(Arrays.asList(topics));
return this;
}

/**
* Sets topics to consumer from. This is required.
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
*
* @param topics the topic to consumer from
* @return this builder
*/
public PulsarSourceBuilder<T> topics(List<String> topics) {
Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
topics.forEach(topicName ->
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
this.topicNames.addAll(topics);
return this;
}

/**
* Use topic pattern to config sets of topics to consumer
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
*
* @param topicsPattern topic pattern to consumer from
* @return this builder
*/
public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
this.topicsPattern = topicsPattern;
return this;
}

/**
* Use topic pattern to config sets of topics to consumer
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
*
* @param topic the topic to consumer from
* @param topicsPattern topic pattern string to consumer from
* @return this builder
*/
public PulsarSourceBuilder<T> topic(String topic) {
Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
this.topic = topic;
public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
this.topicsPattern = Pattern.compile(topicsPattern);
return this;
}

Expand Down Expand Up @@ -101,9 +164,9 @@ public PulsarSourceBuilder<T> acknowledgementBatchSize(long size) {

public SourceFunction<T> build() {
Preconditions.checkNotNull(serviceUrl, "a service url is required");
Preconditions.checkNotNull(topic, "a topic is required");
Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
"At least one topic or topics pattern is required");
Preconditions.checkNotNull(subscriptionName, "a subscription name is required");

return new PulsarConsumerSource<>(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.regex.Pattern;

/**
* Tests for PulsarSourceBuilder
Expand All @@ -49,7 +51,7 @@ public void testBuild() {
Assert.assertNotNull(sourceFunction);
}

@Test(expected = NullPointerException.class)
@Test(expected = IllegalArgumentException.class)
public void testBuildWithoutSettingRequiredProperties() {
pulsarSourceBuilder.build();
}
Expand All @@ -74,6 +76,32 @@ public void testTopicWithBlank() {
pulsarSourceBuilder.topic(" ");
}

@Test(expected = IllegalArgumentException.class)
public void testTopicsWithNull() {
pulsarSourceBuilder.topics(null);
}

@Test(expected = IllegalArgumentException.class)
public void testTopicsWithBlank() {
pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
}

@Test(expected = IllegalArgumentException.class)
public void testTopicPatternWithNull() {
pulsarSourceBuilder.topicsPattern(null);
}

@Test(expected = IllegalArgumentException.class)
public void testTopicPatternAlreadySet() {
pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
}

@Test(expected = IllegalArgumentException.class)
public void testTopicPattenStringWithNull() {
pulsarSourceBuilder.topicsPatternString(null);
}

@Test(expected = IllegalArgumentException.class)
public void testSubscriptionNameWithNull() {
pulsarSourceBuilder.subscriptionName(null);
Expand Down

0 comments on commit a010166

Please sign in to comment.