Skip to content

Commit

Permalink
Fix Topic Pattern functionality for Python functions (apache#2760)
Browse files Browse the repository at this point in the history
* pulsar client no longer has subscribe_pattern interface

* Added integration tests for topic pattern

* Fixed integration test
  • Loading branch information
srkukarni authored Oct 10, 2018
1 parent 6e9b6c2 commit 4ed2e33
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 29 deletions.
5 changes: 3 additions & 2 deletions pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from threading import Timer
import traceback
import sys
import re

import pulsar
import contextimpl
Expand Down Expand Up @@ -202,8 +203,8 @@ def run(self):
self.input_serdes[topic] = serde_kclass()
Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
if consumer_conf.isRegexPattern:
self.consumers[topic] = self.pulsar_client.subscribe_pattern(
str(topic), subscription_name,
self.consumers[topic] = self.pulsar_client.subscribe(
re.compile(str(topic)), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

import com.google.common.base.Stopwatch;
import com.google.gson.Gson;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -581,22 +584,48 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou

@Test(enabled = false)
public void testPythonExclamationFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON);
testExclamationFunction(Runtime.PYTHON, false);
}

@Test(enabled = false)
public void testPythonExclamationTopicPatternFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, true);
}

@Test
public void testJavaExclamationFunction() throws Exception {
testExclamationFunction(Runtime.JAVA);
testExclamationFunction(Runtime.JAVA, false);
}

@Test
public void testJavaExclamationTopicPatternFunction() throws Exception {
testExclamationFunction(Runtime.JAVA, true);
}

private void testExclamationFunction(Runtime runtime) throws Exception {
private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode
return;
}

String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
if (isTopicPattern) {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
.topic(inputTopicName + "1")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
.topic(inputTopicName + "2")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
inputTopicName = inputTopicName + ".*";
}
String functionName = "test-exclamation-fn-" + randomName(8);
final int numMessages = 10;

Expand Down Expand Up @@ -640,7 +669,11 @@ private static <T> void submitFunction(Runtime runtime,
String functionClass,
Schema<T> inputTopicSchema) throws Exception {
CommandGenerator generator;
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
if (inputTopicName.endsWith(".*")) {
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
} else {
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
String command;
Expand Down Expand Up @@ -731,17 +764,40 @@ private static void publishAndConsumeMessages(String inputTopic,
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();
if (inputTopic.endsWith(".*")) {
@Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
@Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();

for (int i = 0; i < numMessages / 2; i++) {
producer1.send("message-" + i);
}

for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send("message-" + i);
}
} else {
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();

for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}
}

Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
expectedMessages.add("message-" + i + "!");
}

for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive();
assertEquals("message-" + i + "!", msg.getValue());
Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
assertTrue(expectedMessages.contains(msg.getValue()));
expectedMessages.remove(msg.getValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum Runtime {
private String namespace = "default";
private String functionClassName;
private String sourceTopic;
private String sourceTopicPattern;
private Map<String, String> customSereSourceTopics;
private String sinkTopic;
private String logTopic;
Expand All @@ -64,28 +65,14 @@ public static CommandGenerator createDefaultGenerator(String sourceTopic, String
return generator;
}

public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics,
String functionClassName) {
public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) {
CommandGenerator generator = new CommandGenerator();
generator.setCustomSereSourceTopics(customSereSourceTopics);
generator.setSourceTopicPattern(sourceTopicPattern);
generator.setFunctionClassName(functionClassName);
generator.setRuntime(Runtime.JAVA);
return generator;
}

public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) {
CommandGenerator generator = new CommandGenerator();
generator.setTenant(tenant);
generator.setNamespace(namespace);
generator.setFunctionName(functionName);
generator.setRuntime(Runtime.JAVA);
return generator;
}

public void createAdminUrl(String workerHost, int port) {
adminUrl = "http://" + workerHost + ":" + port;
}

public String generateCreateFunctionCommand() {
return generateCreateFunctionCommand(null);
}
Expand All @@ -110,6 +97,9 @@ public String generateCreateFunctionCommand(String codeFile) {
if (sourceTopic != null) {
commandBuilder.append(" --inputs " + sourceTopic);
}
if (sourceTopicPattern != null) {
commandBuilder.append(" --topics-pattern " + sourceTopicPattern);
}
if (logTopic != null) {
commandBuilder.append(" --logTopic " + logTopic);
}
Expand Down

0 comments on commit 4ed2e33

Please sign in to comment.