Skip to content

Commit

Permalink
function to read compacted topics (apache#7193)
Browse files Browse the repository at this point in the history
Fixes apache#5538

### Motivation



### Modifications
In function mode and sink mode, PulsarSource can read compacted topic。
By `inputSpecs` parameter, each topic can independently decide whether to read compacted

### Verifying this change
unit test:
org.apache.pulsar.io.PulsarFunctionE2ETest#testReadCompactedFunction
org.apache.pulsar.io.PulsarFunctionE2ETest#testReadCompactedSink
  • Loading branch information
315157973 authored Jul 2, 2020
1 parent 768813e commit 7a80ca9
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;

Expand All @@ -54,6 +55,8 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -73,6 +76,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -90,6 +94,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
Expand Down Expand Up @@ -501,6 +506,135 @@ public void testE2EPulsarFunctionWithUrl() throws Exception {
testE2EPulsarFunction(jarFilePathUrl);
}

@Test(timeOut = 30000)
public void testReadCompactedFunction() throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
final int messageNum = 20;
final int maxKeys = 10;
// 1 Setup producer
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(sourceTopic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close();
// 2 Send messages and record the expected values after compaction
Map<String, String> expected = new HashMap<>();
for (int j = 0; j < messageNum; j++) {
String key = "key" + j % maxKeys;
String value = "my-message-" + key + j;
producer.newMessage().key(key).value(value).send();
//Duplicate keys will exist, the value of the new key will be retained
expected.put(key, value);
}
// 3 Trigger compaction
ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();

// 4 Setup function
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
sourceTopic, sinkTopic, subscriptionName);
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = new ConsumerConfig();
Map<String,String> consumerProperties = new HashMap<>();
consumerProperties.put("readCompacted","true");
consumerConfig.setConsumerProperties(consumerProperties);
inputSpecs.put(sourceTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);

// 5 Function should only read compacted value,so we will only receive compacted messages
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sink-sub").subscribe();
int count = 0;
while (true) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
count++;
Assert.assertEquals(expected.remove(message.getKey()) + "!", message.getValue());
}
Assert.assertEquals(count, maxKeys);
Assert.assertTrue(expected.isEmpty());

compactionScheduler.shutdownNow();
consumer.close();
producer.close();
}

@Test(timeOut = 30000)
public void testReadCompactedSink() throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic2";
final String sinkName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
final int messageNum = 20;
final int maxKeys = 10;
// 1 Setup producer
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(sourceTopic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close();
// 2 Send messages and record the expected values after compaction
Map<String, String> expected = new HashMap<>();
for (int j = 0; j < messageNum; j++) {
String key = "key" + j % maxKeys;
String value = "my-message-" + key + j;
producer.newMessage().key(key).value(value).send();
//Duplicate keys will exist, the value of the new key will be retained
expected.put(key, value);
}
// 3 Trigger compaction
ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();

// 4 Setup sink
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
Map<String,String> consumerProperties = new HashMap<>();
consumerProperties.put("readCompacted","true");
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().consumerProperties(consumerProperties).build()));
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);

// 5 Sink should only read compacted value,so we will only receive compacted messages
retryStrategically((test) -> {
try {
String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
Metric m = metrics.get("pulsar_sink_received_total");
return m.value == (double) maxKeys;
} catch (Exception e) {
return false;
}
}, 50, 1000);

compactionScheduler.shutdownNow();
producer.close();
}

@Test(timeOut = 30000)
private void testPulsarSinkDLQ() throws Exception {
final String namespacePortion = "io";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
Expand Down Expand Up @@ -235,6 +236,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String customSchemaInputString;
@Parameter(names = "--custom-schema-outputs", description = "The map of input topics to Schema properties (as a JSON string)")
protected String customSchemaOutputString;
@Parameter(names = "--input-specs", description = "The map of inputs to custom configuration (as a JSON string)")
protected String inputSpecs;
// for backwards compatibility purposes
@Parameter(names = "--outputSerdeClassName", description = "The SerDe class to be used for messages output by the function", hidden = true)
protected String DEPRECATED_outputSerdeClassName;
Expand Down Expand Up @@ -375,6 +378,10 @@ void processArguments() throws Exception {
Map<String, String> customSchemaOutputMap = new Gson().fromJson(customSchemaOutputString, type);
functionConfig.setCustomSchemaOutputs(customSchemaOutputMap);
}
if (null != inputSpecs) {
Type type = new TypeToken<Map<String, ConsumerConfig>>() {}.getType();
functionConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}
if (null != topicsPattern) {
functionConfig.setTopicsPattern(topicsPattern);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
Expand Down Expand Up @@ -276,6 +277,9 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
protected String customSchemaInputString;

@Parameter(names = "--input-specs", description = "The map of inputs to custom configuration (as a JSON string)")
protected String inputSpecs;

@Parameter(names = "--max-redeliver-count", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue")
protected Integer maxMessageRetries;
@Parameter(names = "--dead-letter-topic", description = "Name of the dead topic where the failing messages will be sent.")
Expand Down Expand Up @@ -386,6 +390,11 @@ void processArguments() throws Exception {
sinkConfig.setTopicToSchemaType(customSchemaInputMap);
}

if(null != inputSpecs){
Type type = new TypeToken<Map<String, ConsumerConfig>>(){}.getType();
sinkConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}

sinkConfig.setMaxMessageRetries(maxMessageRetries);
if (null != deadLetterTopic) {
sinkConfig.setDeadLetterTopic(deadLetterTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ConsumerConfig {
private boolean isRegexPattern;
@Builder.Default
private Map<String, String> schemaProperties = new HashMap<>();
@Builder.Default
private Map<String, String> consumerProperties = new HashMap<>();
private Integer receiverQueueSize;

public ConsumerConfig(String schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
consumerConfig.setSerdeClassName(conf.getSerdeClassName());
}
consumerConfig.setSchemaProperties(conf.getSchemaPropertiesMap());
consumerConfig.setConsumerProperties(conf.getConsumerPropertiesMap());
if (conf.hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(pulsarSourceConfig.getSubscriptionName())
.subscriptionInitialPosition(pulsarSourceConfig.getSubscriptionPosition())
.subscriptionType(pulsarSourceConfig.getSubscriptionType())
.messageListener(this);
.subscriptionType(pulsarSourceConfig.getSubscriptionType());

if (conf.getConsumerProperties() != null && !conf.getConsumerProperties().isEmpty()) {
cb.loadConf(new HashMap<>(conf.getConsumerProperties()));
}
//messageListener is annotated with @JsonIgnore,so setting messageListener should be put behind loadConf
cb.messageListener(this);

if (conf.isRegexPattern) {
cb = cb.topicsPattern(topic);
Expand All @@ -94,7 +99,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
if (pulsarSourceConfig.getTimeoutMs() != null) {
cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
}

if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) {
DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
Expand Down Expand Up @@ -169,7 +173,11 @@ Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundExcept
schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf, true);
}
configs.put(topic,
ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).build());
ConsumerConfig.<T> builder().
schema(schema).
isRegexPattern(conf.isRegexPattern()).
receiverQueueSize(conf.getReceiverQueueSize()).
consumerProperties(conf.getConsumerProperties()).build());
});

return configs;
Expand All @@ -189,6 +197,7 @@ private static class ConsumerConfig<T> {
private Schema<T> schema;
private boolean isRegexPattern;
private Integer receiverQueueSize;
private Map<String, String> consumerProperties;
}

}
3 changes: 3 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message ConsumerSpec {
}
ReceiverQueueSize receiverQueueSize = 4;
map<string, string> schemaProperties = 5;
map<string, string> consumerProperties = 6;
}

message SourceSpec {
Expand Down Expand Up @@ -149,6 +150,8 @@ message SinkSpec {
bool forwardSourceMessageProperty = 8;

map<string, string> schemaProperties = 9;

map<string, string> consumerProperties = 10;
}

message PackageLocationMetaData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
Function.ConsumerSpec.newBuilder()
.setSchemaType(consumerConfig.getSchemaType())
.putAllSchemaProperties(consumerConfig.getSchemaProperties())
.putAllConsumerProperties(consumerConfig.getConsumerProperties())
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
Expand All @@ -132,6 +133,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (consumerConf.getSchemaProperties() != null) {
bldr.putAllSchemaProperties(consumerConf.getSchemaProperties());
}
bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}
Expand Down Expand Up @@ -182,6 +184,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (StringUtils.isNotEmpty(conf)) {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sinkSpecBuilder.putAllSchemaProperties(consumerConfig.getSchemaProperties());
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(String.format("Incorrect custom schema outputs ,Topic %s ", functionConfig.getOutput()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(spec.getReceiverQueueSize()).build());
}
bldr.putAllConsumerProperties(spec.getConsumerProperties());
sourceSpecBuilder.putInputSpecs(topic, bldr.build());
});
}
Expand Down Expand Up @@ -259,6 +260,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
}
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
sinkConfig.setInputSpecs(consumerConfigMap);
Expand Down

0 comments on commit 7a80ca9

Please sign in to comment.