Skip to content

Commit

Permalink
[issue#4042] improve java functions API (apache#4093)
Browse files Browse the repository at this point in the history
Master Issue: apache#4042

Fixes apache#4042

Motivation

improve java functions API, when you need to publish the fields in the TypedMessageBuilder, there is no need to add a new publish method, just modify the interface in the TypedMessageBuilder.
  • Loading branch information
wolfstudy authored and sijie committed May 4, 2019
1 parent e9619fa commit 65fe863
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ protected static FunctionConfig createFunctionConfig(String tenant, String names
functionConfig.setSubName(subscriptionName);
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setAutoAck(true);
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf");
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish");
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
Map<String, Object> userConfig = new HashMap<>();
userConfig.put("publish-topic", publishTopic);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/api-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<artifactId>typetools</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pulsar.functions.api;

import java.nio.ByteBuffer;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;

import java.util.Collection;
Expand Down Expand Up @@ -232,6 +236,7 @@ public interface Context {
* @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
* of the custom schema class
* @return A future that completes when the framework is done publishing the message
* @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);

Expand All @@ -241,28 +246,18 @@ public interface Context {
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
* @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
*/
<O> CompletableFuture<Void> publish(String topicName, O object);

/**
* Publish an object using serDe or schema class for serializing to the topic.
*
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
* of the custom schema class
* @param messageConf A map of configurations to set for the message that will be published
* The available options are:
*
* "key" - Parition Key
* "properties" - Map of properties
* "eventTime"
* "sequenceId"
* "replicationClusters"
* "disableReplication"
* New output message using schema for serializing to the topic
*
* @return A future that completes when the framework is done publishing the message
* @param topicName The name of the topic for output message
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the message builder instance
* @throws PulsarClientException
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf);

<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
Expand All @@ -52,12 +45,9 @@
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -94,11 +84,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final Summary userMetricsSummary;

private final static String[] userMetricsLabelNames;

static {
// add label to indicate user metric
userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}

private final ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
Expand Down Expand Up @@ -259,7 +251,7 @@ public String getSecret(String secretName) {
return null;
}
}

private void ensureStateEnabled() {
checkState(null != stateContext, "State is not enabled.");
}
Expand Down Expand Up @@ -337,69 +329,26 @@ public <O> CompletableFuture<Void> publish(String topicName, O object) {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
return publish(topicName, object, schemaOrSerdeClassName, null);
return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
}

@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf) {
return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), messageConf);
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName, schema).newMessage();
messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
return messageBuilder;
}

@SuppressWarnings("unchecked")
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema, Map<String, Object> messageConf) {
Producer<O> producer = (Producer<O>) publishProducers.get(topicName);

if (producer == null) {
try {
Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
.schema(schema)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
// set send timeout to be infinity to prevent potential deadlock with consumer
// that might happen when consumer is blocked due to unacked messages
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.properties(InstanceUtils.getProperties(componentType,
FunctionCommon.getFullyQualifiedName(
this.config.getFunctionDetails().getTenant(),
this.config.getFunctionDetails().getNamespace(),
this.config.getFunctionDetails().getName()),
this.config.getInstanceId()))
.create();

Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);

if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
newProducer.close();
producer = existingProducer;
} else {
producer = newProducer;
}

} catch (PulsarClientException e) {
logger.error("Failed to create Producer while doing user publish", e);
return FutureUtil.failedFuture(e);
}
}

TypedMessageBuilder<O> messageBuilder = producer.newMessage();
if (messageConf != null) {
messageBuilder.loadConf(messageConf);
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
try {
return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
} catch (PulsarClientException e) {
logger.error("Failed to create Producer while doing user publish", e);
return FutureUtil.failedFuture(e);
}
CompletableFuture<Void> future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
future.exceptionally(e -> {
this.statsManager.incrSysExceptions(e);
logger.error("Failed to publish to topic {} with error {}", topicName, e);
return null;
});
return future;
}

@Override
Expand All @@ -417,6 +366,46 @@ public void recordMetric(String metricName, double value) {
}
}

private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
Producer<O> producer = (Producer<O>) publishProducers.get(topicName);

if (producer == null) {

Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
.schema(schema)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
// set send timeout to be infinity to prevent potential deadlock with consumer
// that might happen when consumer is blocked due to unacked messages
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.properties(InstanceUtils.getProperties(componentType,
FunctionCommon.getFullyQualifiedName(
this.config.getFunctionDetails().getTenant(),
this.config.getFunctionDetails().getNamespace(),
this.config.getFunctionDetails().getName()),
this.config.getInstanceId()))
.create();

Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);

if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
newProducer.close();
producer = existingProducer;
} else {
producer = newProducer;
}

}
return producer;
}

public Map<String, Double> getAndResetMetrics() {
Map<String, Double> retval = getMetrics();
resetMetrics();
Expand All @@ -443,4 +432,105 @@ public Map<String, Double> getMetrics() {
}
return metricsMap;
}

class MessageBuilderImpl<O> implements TypedMessageBuilder<O> {
private TypedMessageBuilder<O> underlyingBuilder;
@Override
public MessageId send() throws PulsarClientException {
try {
return sendAsync().get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
}
}

@Override
public CompletableFuture<MessageId> sendAsync() {
return underlyingBuilder.sendAsync()
.whenComplete((result, cause) -> {
if (null != cause) {
statsManager.incrSysExceptions(cause);
logger.error("Failed to publish to topic with error {}", cause);
}
});
}

@Override
public TypedMessageBuilder<O> key(String key) {
underlyingBuilder.key(key);
return this;
}

@Override
public TypedMessageBuilder<O> keyBytes(byte[] key) {
underlyingBuilder.keyBytes(key);
return this;
}

@Override
public TypedMessageBuilder<O> orderingKey(byte[] orderingKey) {
underlyingBuilder.orderingKey(orderingKey);
return this;
}

@Override
public TypedMessageBuilder<O> value(O value) {
underlyingBuilder.value(value);
return this;
}

@Override
public TypedMessageBuilder<O> property(String name, String value) {
underlyingBuilder.property(name, value);
return this;
}

@Override
public TypedMessageBuilder<O> properties(Map<String, String> properties) {
underlyingBuilder.properties(properties);
return this;
}

@Override
public TypedMessageBuilder<O> eventTime(long timestamp) {
underlyingBuilder.eventTime(timestamp);
return this;
}

@Override
public TypedMessageBuilder<O> sequenceId(long sequenceId) {
underlyingBuilder.sequenceId(sequenceId);
return this;
}

@Override
public TypedMessageBuilder<O> replicationClusters(List<String> clusters) {
underlyingBuilder.replicationClusters(clusters);
return this;
}

@Override
public TypedMessageBuilder<O> disableReplication() {
underlyingBuilder.disableReplication();
return this;
}

@Override
public TypedMessageBuilder<O> loadConf(Map<String, Object> config) {
underlyingBuilder.loadConf(config);
return this;
}

public void setUnderlyingBuilder(TypedMessageBuilder<O> underlyingBuilder) {
this.underlyingBuilder = underlyingBuilder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,6 @@ public void testGetStateStateEnabled() throws Exception {

@Test
public void testPublishUsingDefaultSchema() throws Exception {
context.publish("sometopic", "Somevalue");
context.newOutputMessage("sometopic", null).value("Somevalue").sendAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

Expand All @@ -30,7 +33,11 @@ public class PublishFunction implements Function<String, Void> {
public Void process(String input, Context context) {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
context.publish(publishTopic, output);
try {
context.newOutputMessage(publishTopic, Schema.STRING).value(output).sendAsync();
} catch (PulsarClientException e) {
context.getLogger().error(e.toString());
}
return null;
}
}
Loading

0 comments on commit 65fe863

Please sign in to comment.