From 65fe863f84a0446f369e3b5a44f9851c1f53c23d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Sun, 5 May 2019 07:49:03 +0800 Subject: [PATCH] [issue#4042] improve java functions API (#4093) Master Issue: #4042 Fixes #4042 Motivation improve java functions API, when you need to publish the fields in the TypedMessageBuilder, there is no need to add a new publish method, just modify the interface in the TypedMessageBuilder. --- .../worker/PulsarFunctionPublishTest.java | 2 +- pulsar-functions/api-java/pom.xml | 6 + .../apache/pulsar/functions/api/Context.java | 31 +-- .../functions/instance/ContextImpl.java | 228 ++++++++++++------ .../functions/instance/ContextImplTest.java | 2 +- .../api/examples/PublishFunction.java | 9 +- ...f.java => TypedMessageBuilderPublish.java} | 19 +- .../api/examples/UserPublishFunction.java | 10 +- ...nf.py => typed_message_builder_publish.py} | 2 +- .../windowing/WindowFunctionExecutor.java | 2 +- .../windowing/WindowFunctionExecutorTest.java | 8 + .../functions/PulsarFunctionsTestBase.java | 7 +- 12 files changed, 221 insertions(+), 105 deletions(-) rename pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/{PublishFunctionWithMessageConf.java => TypedMessageBuilderPublish.java} (71%) rename pulsar-functions/python-examples/{publish_function_with_message_conf.py => typed_message_builder_publish.py} (97%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index ef88fddc7e156..e3b41313188b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -279,7 +279,7 @@ protected static FunctionConfig createFunctionConfig(String tenant, String names functionConfig.setSubName(subscriptionName); functionConfig.setInputs(Collections.singleton(sourceTopic)); functionConfig.setAutoAck(true); - functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf"); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish"); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); Map userConfig = new HashMap<>(); userConfig.put("publish-topic", publishTopic); diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 6f0481db9c43b..33e2ea05c0a22 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -42,6 +42,12 @@ typetools test + + org.apache.pulsar + pulsar-client-api + ${project.version} + compile + diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 556086a271877..013b5f2da6e01 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -19,6 +19,10 @@ package org.apache.pulsar.functions.api; import java.nio.ByteBuffer; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.slf4j.Logger; import java.util.Collection; @@ -232,6 +236,7 @@ public interface Context { * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name * of the custom schema class * @return A future that completes when the framework is done publishing the message + * @deprecated in favor of using {@link #newOutputMessage(String, Schema)} */ CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName); @@ -241,28 +246,18 @@ public interface Context { * @param topicName The name of the topic for publishing * @param object The object that needs to be published * @return A future that completes when the framework is done publishing the message + * @deprecated in favor of using {@link #newOutputMessage(String, Schema)} */ CompletableFuture publish(String topicName, O object); /** - * Publish an object using serDe or schema class for serializing to the topic. - * - * @param topicName The name of the topic for publishing - * @param object The object that needs to be published - * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name - * of the custom schema class - * @param messageConf A map of configurations to set for the message that will be published - * The available options are: - * - * "key" - Parition Key - * "properties" - Map of properties - * "eventTime" - * "sequenceId" - * "replicationClusters" - * "disableReplication" + * New output message using schema for serializing to the topic * - * @return A future that completes when the framework is done publishing the message + * @param topicName The name of the topic for output message + * @param schema provide a way to convert between serialized data and domain objects + * @param + * @return the message builder instance + * @throws PulsarClientException */ - CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName, Map messageConf); - + TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException; } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 5c06b498c58f8..f0d5ade6cdf76 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -25,14 +25,7 @@ import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.HashingScheme; -import org.apache.pulsar.client.api.MessageRoutingMode; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; @@ -52,12 +45,9 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; @@ -94,11 +84,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { private final Summary userMetricsSummary; private final static String[] userMetricsLabelNames; + static { // add label to indicate user metric userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1); userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric"; } + private final ComponentType componentType; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, @@ -259,7 +251,7 @@ public String getSecret(String secretName) { return null; } } - + private void ensureStateEnabled() { checkState(null != stateContext, "State is not enabled."); } @@ -337,69 +329,26 @@ public CompletableFuture publish(String topicName, O object) { @SuppressWarnings("unchecked") @Override public CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName) { - return publish(topicName, object, schemaOrSerdeClassName, null); + return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false)); } @SuppressWarnings("unchecked") @Override - public CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName, Map messageConf) { - return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), messageConf); + public TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException { + MessageBuilderImpl messageBuilder = new MessageBuilderImpl<>(); + TypedMessageBuilder typedMessageBuilder = getProducer(topicName, schema).newMessage(); + messageBuilder.setUnderlyingBuilder(typedMessageBuilder); + return messageBuilder; } @SuppressWarnings("unchecked") - public CompletableFuture publish(String topicName, O object, Schema schema, Map messageConf) { - Producer producer = (Producer) publishProducers.get(topicName); - - if (producer == null) { - try { - Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) - .schema(schema) - .blockIfQueueFull(true) - .enableBatching(true) - .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) - .compressionType(CompressionType.LZ4) - .hashingScheme(HashingScheme.Murmur3_32Hash) // - .messageRoutingMode(MessageRoutingMode.CustomPartition) - .messageRouter(FunctionResultRouter.of()) - // set send timeout to be infinity to prevent potential deadlock with consumer - // that might happen when consumer is blocked due to unacked messages - .sendTimeout(0, TimeUnit.SECONDS) - .topic(topicName) - .properties(InstanceUtils.getProperties(componentType, - FunctionCommon.getFullyQualifiedName( - this.config.getFunctionDetails().getTenant(), - this.config.getFunctionDetails().getNamespace(), - this.config.getFunctionDetails().getName()), - this.config.getInstanceId())) - .create(); - - Producer existingProducer = (Producer) publishProducers.putIfAbsent(topicName, newProducer); - - if (existingProducer != null) { - // The value in the map was not updated after the concurrent put - newProducer.close(); - producer = existingProducer; - } else { - producer = newProducer; - } - - } catch (PulsarClientException e) { - logger.error("Failed to create Producer while doing user publish", e); - return FutureUtil.failedFuture(e); - } - } - - TypedMessageBuilder messageBuilder = producer.newMessage(); - if (messageConf != null) { - messageBuilder.loadConf(messageConf); + public CompletableFuture publish(String topicName, O object, Schema schema) { + try { + return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null); + } catch (PulsarClientException e) { + logger.error("Failed to create Producer while doing user publish", e); + return FutureUtil.failedFuture(e); } - CompletableFuture future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null); - future.exceptionally(e -> { - this.statsManager.incrSysExceptions(e); - logger.error("Failed to publish to topic {} with error {}", topicName, e); - return null; - }); - return future; } @Override @@ -417,6 +366,46 @@ public void recordMetric(String metricName, double value) { } } + private Producer getProducer(String topicName, Schema schema) throws PulsarClientException { + Producer producer = (Producer) publishProducers.get(topicName); + + if (producer == null) { + + Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) + .schema(schema) + .blockIfQueueFull(true) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .compressionType(CompressionType.LZ4) + .hashingScheme(HashingScheme.Murmur3_32Hash) // + .messageRoutingMode(MessageRoutingMode.CustomPartition) + .messageRouter(FunctionResultRouter.of()) + // set send timeout to be infinity to prevent potential deadlock with consumer + // that might happen when consumer is blocked due to unacked messages + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topicName) + .properties(InstanceUtils.getProperties(componentType, + FunctionCommon.getFullyQualifiedName( + this.config.getFunctionDetails().getTenant(), + this.config.getFunctionDetails().getNamespace(), + this.config.getFunctionDetails().getName()), + this.config.getInstanceId())) + .create(); + + Producer existingProducer = (Producer) publishProducers.putIfAbsent(topicName, newProducer); + + if (existingProducer != null) { + // The value in the map was not updated after the concurrent put + newProducer.close(); + producer = existingProducer; + } else { + producer = newProducer; + } + + } + return producer; + } + public Map getAndResetMetrics() { Map retval = getMetrics(); resetMetrics(); @@ -443,4 +432,105 @@ public Map getMetrics() { } return metricsMap; } + + class MessageBuilderImpl implements TypedMessageBuilder { + private TypedMessageBuilder underlyingBuilder; + @Override + public MessageId send() throws PulsarClientException { + try { + return sendAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture sendAsync() { + return underlyingBuilder.sendAsync() + .whenComplete((result, cause) -> { + if (null != cause) { + statsManager.incrSysExceptions(cause); + logger.error("Failed to publish to topic with error {}", cause); + } + }); + } + + @Override + public TypedMessageBuilder key(String key) { + underlyingBuilder.key(key); + return this; + } + + @Override + public TypedMessageBuilder keyBytes(byte[] key) { + underlyingBuilder.keyBytes(key); + return this; + } + + @Override + public TypedMessageBuilder orderingKey(byte[] orderingKey) { + underlyingBuilder.orderingKey(orderingKey); + return this; + } + + @Override + public TypedMessageBuilder value(O value) { + underlyingBuilder.value(value); + return this; + } + + @Override + public TypedMessageBuilder property(String name, String value) { + underlyingBuilder.property(name, value); + return this; + } + + @Override + public TypedMessageBuilder properties(Map properties) { + underlyingBuilder.properties(properties); + return this; + } + + @Override + public TypedMessageBuilder eventTime(long timestamp) { + underlyingBuilder.eventTime(timestamp); + return this; + } + + @Override + public TypedMessageBuilder sequenceId(long sequenceId) { + underlyingBuilder.sequenceId(sequenceId); + return this; + } + + @Override + public TypedMessageBuilder replicationClusters(List clusters) { + underlyingBuilder.replicationClusters(clusters); + return this; + } + + @Override + public TypedMessageBuilder disableReplication() { + underlyingBuilder.disableReplication(); + return this; + } + + @Override + public TypedMessageBuilder loadConf(Map config) { + underlyingBuilder.loadConf(config); + return this; + } + + public void setUnderlyingBuilder(TypedMessageBuilder underlyingBuilder) { + this.underlyingBuilder = underlyingBuilder; + } + } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 9d2579c78b6af..7eb7aae5305f6 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -153,6 +153,6 @@ public void testGetStateStateEnabled() throws Exception { @Test public void testPublishUsingDefaultSchema() throws Exception { - context.publish("sometopic", "Somevalue"); + context.newOutputMessage("sometopic", null).value("Somevalue").sendAsync(); } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java index cda83313c80ed..149cce60cef0f 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.functions.api.examples; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -30,7 +33,11 @@ public class PublishFunction implements Function { public Void process(String input, Context context) { String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic"); String output = String.format("%s!", input); - context.publish(publishTopic, output); + try { + context.newOutputMessage(publishTopic, Schema.STRING).value(output).sendAsync(); + } catch (PulsarClientException e) { + context.getLogger().error(e.toString()); + } return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java similarity index 71% rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java index 9960552cbee69..327d37187b980 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.functions.api.examples; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -30,7 +32,7 @@ * to publish to a desired topic based on config and setting various message configurations to be passed along. * */ -public class PublishFunctionWithMessageConf implements Function { +public class TypedMessageBuilderPublish implements Function { @Override public Void process(String input, Context context) { String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic"); @@ -40,13 +42,16 @@ public Void process(String input, Context context) { properties.put("input_topic", context.getCurrentRecord().getTopicName().get()); properties.putAll(context.getCurrentRecord().getProperties()); - Map messageConf = new HashMap<>(); - messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties); - if (context.getCurrentRecord().getKey().isPresent()) { - messageConf.put(TypedMessageBuilder.CONF_KEY, context.getCurrentRecord().getKey().get()); + try { + TypedMessageBuilder messageBuilder = context.newOutputMessage(publishTopic, Schema.STRING). + value(output).properties(properties); + if (context.getCurrentRecord().getKey().isPresent()){ + messageBuilder.key(context.getCurrentRecord().getKey().get()); + } + messageBuilder.eventTime(System.currentTimeMillis()).sendAsync(); + } catch (PulsarClientException e) { + context.getLogger().error(e.toString()); } - messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME, System.currentTimeMillis()); - context.publish(publishTopic, output, null, messageConf); return null; } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java index ec3fd95ca80b3..41c4d7dce0fe4 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java @@ -20,6 +20,8 @@ import java.util.Optional; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -31,8 +33,12 @@ public class UserPublishFunction implements Function { @Override public Void process(String input, Context context) { Optional topicToWrite = context.getUserConfigValue("topic"); - if (topicToWrite.get() != null) { - context.publish((String) topicToWrite.get(), input); + if (topicToWrite.isPresent()) { + try { + context.newOutputMessage((String) topicToWrite.get(), Schema.STRING).value(input).sendAsync(); + } catch (PulsarClientException e) { + e.printStackTrace(); + } } return null; } diff --git a/pulsar-functions/python-examples/publish_function_with_message_conf.py b/pulsar-functions/python-examples/typed_message_builder_publish.py similarity index 97% rename from pulsar-functions/python-examples/publish_function_with_message_conf.py rename to pulsar-functions/python-examples/typed_message_builder_publish.py index 79aac0239fa55..c6697a716d071 100644 --- a/pulsar-functions/python-examples/publish_function_with_message_conf.py +++ b/pulsar-functions/python-examples/typed_message_builder_publish.py @@ -23,7 +23,7 @@ # Example function that uses the built in publish function in the context # to publish to a desired topic based on config -class PublishFunctionWithMessageConf(Function): +class TypedMessageBuilderPublish(Function): def __init__(self): pass diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index 19459499cd64e..0fcc33e59c0bb 100644 --- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java @@ -282,7 +282,7 @@ public O process(I input, Context context) throws Exception { this.windowManager.add(record, ts, record); } else { if (this.windowConfig.getLateDataTopic() != null) { - context.publish(this.windowConfig.getLateDataTopic(), input); + context.newOutputMessage(this.windowConfig.getLateDataTopic(), null).value(input).sendAsync(); } else { log.info(String.format( "Received a late tuple %s with ts %d. This will not be " + "processed" diff --git a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java index 88ecebe53650f..ac96b44a65f62 100644 --- a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java +++ b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java @@ -20,6 +20,7 @@ import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Record; @@ -36,8 +37,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -204,6 +208,10 @@ public void testExecuteWithLateTupleStream() throws Exception { windowConfig.setLateDataTopic("$late"); Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))) .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY); + TypedMessageBuilder typedMessageBuilder = Mockito.mock(TypedMessageBuilder.class); + Mockito.when(typedMessageBuilder.value(anyString())).thenReturn(typedMessageBuilder); + Mockito.when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf()); + Mockito.when(context.newOutputMessage(anyString(), anyObject())).thenReturn(typedMessageBuilder); long[] timestamps = {603, 605, 607, 618, 626, 636, 600}; List events = new ArrayList<>(timestamps.length); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index 823b7d1bce298..48b39321b707c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -74,7 +74,7 @@ public void teardownFunctionWorkers() { "org.apache.pulsar.functions.api.examples.ExclamationFunction"; public static final String PUBLISH_JAVA_CLASS = - "org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf"; + "org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish"; public static final String EXCEPTION_JAVA_CLASS = "org.apache.pulsar.tests.integration.functions.ExceptionFunction"; @@ -89,14 +89,13 @@ public void teardownFunctionWorkers() { public static final String EXCLAMATION_PYTHONZIP_CLASS = "exclamation"; + public static final String PUBLISH_PYTHON_CLASS = "typed_message_builder_publish.TypedMessageBuilderPublish"; public static final String EXCEPTION_PYTHON_CLASS = "exception_function"; - public static final String PUBLISH_PYTHON_CLASS = "publish_function_with_message_conf.PublishFunctionWithMessageConf"; - public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py"; public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py"; public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip"; - public static final String PUBLISH_FUNCTION_PYTHON_FILE = "publish_function_with_message_conf.py"; + public static final String PUBLISH_FUNCTION_PYTHON_FILE = "typed_message_builder_publish.py"; public static final String EXCEPTION_FUNCTION_PYTHON_FILE = "exception_function.py"; protected static String getExclamationClass(Runtime runtime,