From 7cd28b9d5cc862fe239f5e8fa1ca616237764607 Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Fri, 8 May 2020 11:04:09 +0800 Subject: [PATCH] Support function with format: Function> (#6684) Fixes #6519 ### Motivation Currently, Pulsar Functions not support Async mode, e.g. user passed in a Function in format : ``` Function> ``` This kind of function is useful if the function might use RPCs to call external systems. e.g. ```java public class AsyncFunction implements Function> { CompletableFuture apply (String input) { CompletableFuture future = new CompletableFuture(); ...function compute... future.whenComplete(() -> { ... call external system ... }); return future; } ``` ### Modifications - add support for Async Functions support. ### Verifying this change current ut passed. * support func: Function> * add 2 examples * add limit to the max outstanding items --- .../common/functions/FunctionConfig.java | 3 + .../functions/instance/InstanceConfig.java | 3 + .../functions/instance/JavaInstance.java | 64 ++++++++++++- .../instance/JavaInstanceRunnable.java | 41 ++++---- .../functions/instance/JavaInstanceTest.java | 96 +++++++++++++++++-- .../api/examples/AsyncContextFunction.java | 59 ++++++++++++ .../JavaNativeAsyncExclamationFunction.java | 41 ++++++++ .../apache/pulsar/functions/LocalRunner.java | 2 + .../runtime/JavaInstanceStarter.java | 4 + .../pulsar/functions/worker/WorkerConfig.java | 6 ++ .../functions/worker/FunctionActioner.java | 1 + 11 files changed, 289 insertions(+), 31 deletions(-) create mode 100644 pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java create mode 100644 pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index d0b72ab1b9a8e..7b8a48b6e07b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -113,4 +113,7 @@ public enum Runtime { // to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer // interface private String customRuntimeOptions; + // Max pending async requests per instance to avoid large number of concurrent requests. + // Only used in AsyncFunction. Default: 1000. + private Integer maxPendingAsyncRequests = 1000; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java index 14dadacf6cfae..1f341732824e6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java @@ -36,6 +36,9 @@ public class InstanceConfig { private Function.FunctionAuthenticationSpec functionAuthenticationSpec; private int port; private String clusterName; + // Max pending async requests per instance to avoid large number of concurrent requests. + // Only used in AsyncFunction. Default: 1000 + private int maxPendingAsyncRequests = 1000; /** * Get the string representation of {@link #getInstanceId()}. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 8aee702a9fd49..1e18a0763d465 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.functions.instance; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -40,9 +45,18 @@ public class JavaInstance implements AutoCloseable { private Function function; private java.util.function.Function javaUtilFunction; - public JavaInstance(ContextImpl contextImpl, Object userClassObject) { + // for Async function max out standing items + private final InstanceConfig instanceConfig; + private final Executor executor; + @Getter + private final LinkedBlockingQueue> pendingAsyncRequests; + + public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) { this.context = contextImpl; + this.instanceConfig = instanceConfig; + this.executor = Executors.newSingleThreadExecutor(); + this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests()); // create the functions if (userClassObject instanceof Function) { @@ -52,23 +66,63 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject) { } } - public JavaExecutionResult handleMessage(Record record, Object input) { + public CompletableFuture handleMessage(Record record, Object input) { if (context != null) { context.setCurrentMessageContext(record); } + + final CompletableFuture future = new CompletableFuture<>(); JavaExecutionResult executionResult = new JavaExecutionResult(); + + final Object output; + try { - Object output; if (function != null) { output = function.process(input, context); } else { output = javaUtilFunction.apply(input); } - executionResult.setResult(output); } catch (Exception ex) { executionResult.setUserException(ex); + future.complete(executionResult); + return future; + } + + if (output instanceof CompletableFuture) { + // Function is in format: Function> + try { + pendingAsyncRequests.put((CompletableFuture) output); + } catch (InterruptedException ie) { + log.warn("Exception while put Async requests", ie); + executionResult.setUserException(ie); + future.complete(executionResult); + return future; + } + + ((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> { + if (log.isDebugEnabled()) { + log.debug("Got result async: object: {}, throwable: {}", obj, throwable); + } + + if (throwable != null) { + executionResult.setUserException(new Exception((Throwable)throwable)); + pendingAsyncRequests.remove(output); + future.complete(executionResult); + return; + } + executionResult.setResult(obj); + pendingAsyncRequests.remove(output); + future.complete(executionResult); + }, executor); + } else { + if (log.isDebugEnabled()) { + log.debug("Got result: object: {}", output); + } + executionResult.setResult(output); + future.complete(executionResult); } - return executionResult; + + return future; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 25689a93f3fe7..66cdb60c01bec 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; +import java.util.concurrent.CompletableFuture; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -214,7 +215,7 @@ JavaInstance setupJavaInstance() throws Exception { // start any log topic handler setupLogHandler(); - return new JavaInstance(contextImpl, object); + return new JavaInstance(contextImpl, object, instanceConfig); } ContextImpl setupContext() { @@ -254,7 +255,7 @@ public void run() { } addLogTopicHandler(); - JavaExecutionResult result; + CompletableFuture result; // set last invocation time stats.setLastInvocation(System.currentTimeMillis()); @@ -272,10 +273,6 @@ public void run() { removeLogTopicHandler(); - if (log.isDebugEnabled()) { - log.debug("Got result: {}", result.getResult()); - } - try { processResult(currentRecord, result); } catch (Exception e) { @@ -415,23 +412,27 @@ private void setupStateTable() throws Exception { } private void processResult(Record srcRecord, - JavaExecutionResult result) throws Exception { - if (result.getUserException() != null) { - log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException()); - stats.incrUserExceptions(result.getUserException()); - srcRecord.fail(); - } else { - if (result.getResult() != null) { - sendOutputMessage(srcRecord, result.getResult()); + CompletableFuture result) throws Exception { + result.whenComplete((result1, throwable) -> { + if (throwable != null || result1.getUserException() != null) { + Throwable t = throwable != null ? throwable : result1.getUserException(); + log.warn("Encountered exception when processing message {}", + srcRecord, t); + stats.incrUserExceptions(t); + srcRecord.fail(); } else { - if (instanceConfig.getFunctionDetails().getAutoAck()) { - // the function doesn't produce any result or the user doesn't want the result. - srcRecord.ack(); + if (result1.getResult() != null) { + sendOutputMessage(srcRecord, result1.getResult()); + } else { + if (instanceConfig.getFunctionDetails().getAutoAck()) { + // the function doesn't produce any result or the user doesn't want the result. + srcRecord.ack(); + } } + // increment total successfully processed + stats.incrTotalProcessedSuccessfully(); } - // increment total successfully processed - stats.incrTotalProcessedSuccessfully(); - } + }); } private void sendOutputMessage(Record srcRecord, Object output) { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 0cb361de67fa8..5061d1e2144a5 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -22,10 +22,14 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.testng.annotations.Test; +@Slf4j public class JavaInstanceTest { /** @@ -33,14 +37,94 @@ public class JavaInstanceTest { * @throws Exception */ @Test - public void testLambda() { + public void testLambda() throws Exception { JavaInstance instance = new JavaInstance( - mock(ContextImpl.class), - (Function) (input, context) -> input + "-lambda"); + mock(ContextImpl.class), + (Function) (input, context) -> input + "-lambda", + new InstanceConfig()); String testString = "ABC123"; - JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); - assertNotNull(result.getResult()); - assertEquals(new String(testString + "-lambda"), result.getResult()); + CompletableFuture result = instance.handleMessage(mock(Record.class), testString); + assertNotNull(result.get().getResult()); + assertEquals(new String(testString + "-lambda"), result.get().getResult()); + instance.close(); + } + + @Test + public void testAsyncFunction() throws Exception { + InstanceConfig instanceConfig = new InstanceConfig(); + + Function> function = (input, context) -> { + log.info("input string: {}", input); + CompletableFuture result = new CompletableFuture<>(); + Executors.newCachedThreadPool().submit(() -> { + try { + Thread.sleep(500); + result.complete(String.format("%s-lambda", input)); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + function, + instanceConfig); + String testString = "ABC123"; + CompletableFuture result = instance.handleMessage(mock(Record.class), testString); + assertNotNull(result.get().getResult()); + assertEquals(new String(testString + "-lambda"), result.get().getResult()); + instance.close(); + } + + @Test + public void testAsyncFunctionMaxPending() throws Exception { + InstanceConfig instanceConfig = new InstanceConfig(); + int pendingQueueSize = 2; + instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize); + + Function> function = (input, context) -> { + log.info("input string: {}", input); + CompletableFuture result = new CompletableFuture<>(); + Executors.newCachedThreadPool().submit(() -> { + try { + Thread.sleep(500); + result.complete(String.format("%s-lambda", input)); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + function, + instanceConfig); + String testString = "ABC123"; + + long startTime = System.currentTimeMillis(); + assertEquals(pendingQueueSize, instance.getPendingAsyncRequests().remainingCapacity()); + CompletableFuture result1 = instance.handleMessage(mock(Record.class), testString); + assertEquals(pendingQueueSize - 1, instance.getPendingAsyncRequests().remainingCapacity()); + CompletableFuture result2 = instance.handleMessage(mock(Record.class), testString); + assertEquals(pendingQueueSize - 2, instance.getPendingAsyncRequests().remainingCapacity()); + CompletableFuture result3 = instance.handleMessage(mock(Record.class), testString); + // no space left + assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity()); + + instance.getPendingAsyncRequests().remainingCapacity(); + assertNotNull(result1.get().getResult()); + assertNotNull(result2.get().getResult()); + assertNotNull(result3.get().getResult()); + + assertEquals(new String(testString + "-lambda"), result1.get().getResult()); + long endTime = System.currentTimeMillis(); + + log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); instance.close(); } } diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java new file mode 100644 index 0000000000000..b70bc7c4ac02c --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.slf4j.Logger; + +public class AsyncContextFunction implements Function> { + @Override + public CompletableFuture process(String input, Context context) { + Logger LOG = context.getLogger(); + CompletableFuture future = new CompletableFuture(); + + // this method only delay a function execute. + Executors.newCachedThreadPool().submit(() -> { + try { + Thread.sleep(500); + } catch (Exception e) { + LOG.error("Exception when Thread.sleep", e); + future.completeExceptionally(e); + } + + String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", ")); + String funcName = context.getFunctionName(); + + String logMessage = String + .format("A message with value of \"%s\" has arrived on one of the following topics: %s\n", + input, inputTopics); + LOG.info(logMessage); + + String metricName = String.format("function-%s-messages-received", funcName); + context.recordMetric(metricName, 1); + + future.complete(null); + }); + + return future; + } +} diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java new file mode 100644 index 0000000000000..7cad46be98c05 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.function.Function; + +public class JavaNativeAsyncExclamationFunction implements Function> { + @Override + public CompletableFuture apply(String input) { + CompletableFuture future = new CompletableFuture(); + + Executors.newCachedThreadPool().submit(() -> { + try { + Thread.sleep(500); + future.complete(String.format("%s-!!", input)); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + + return future; + } +} diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index b9ac10eea6918..2d8b72f9817cd 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -358,6 +358,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio instanceConfig.setMaxBufferedTuples(1024); instanceConfig.setPort(FunctionCommon.findAvailablePort()); instanceConfig.setClusterName("local"); + instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests()); RuntimeSpawner runtimeSpawner = new RuntimeSpawner( instanceConfig, userCodeFile, @@ -417,6 +418,7 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi instanceConfig.setMaxBufferedTuples(1024); instanceConfig.setPort(FunctionCommon.findAvailablePort()); instanceConfig.setClusterName("local"); + instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests()); RuntimeSpawner runtimeSpawner = new RuntimeSpawner( instanceConfig, userCodeFile, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index ec2e36a9b9f6e..970047fcd7332 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -125,6 +125,9 @@ public class JavaInstanceStarter implements AutoCloseable { @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true) public String clusterName; + @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false) + public int maxPendingAsyncRequests = 1000; + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; @@ -147,6 +150,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL instanceConfig.setInstanceId(instanceId); instanceConfig.setMaxBufferedTuples(maxBufferedTuples); instanceConfig.setClusterName(clusterName); + instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests); Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder(); if (functionDetailsJsonString.charAt(0) == '\'') { functionDetailsJsonString = functionDetailsJsonString.substring(1); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 4ffffe9329a02..7cd718652104e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -409,6 +409,12 @@ public String getFunctionAuthProviderClassName() { ) private Map runtimeCustomizerConfig = Collections.emptyMap(); + @FieldContext( + doc = "Max pending async requests per instance to avoid large number of concurrent requests." + + "Only used in AsyncFunction. Default: 1000" + ) + private int maxPendingAsyncRequests = 1000; + public String getFunctionMetadataTopic() { return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 5c4f8712dea61..eef95a1a72215 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -177,6 +177,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu instanceConfig.setPort(FunctionCommon.findAvailablePort()); instanceConfig.setClusterName(clusterName); instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec); + instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests()); return instanceConfig; }