Skip to content

Commit

Permalink
Support function with format: Function<I, CompletableFuture<O>> (apac…
Browse files Browse the repository at this point in the history
…he#6684)

Fixes apache#6519

### Motivation

Currently, Pulsar Functions not support Async mode, e.g. user passed in a Function in format :
```
Function<I, CompletableFuture<O>>
```
This kind of function is useful if the function might use RPCs to call external systems.

e.g.
```java
public class AsyncFunction implements Function<String, CompletableFuture<O>> {
    CompletableFuture<O> apply (String input) {
        CompletableFuture future = new CompletableFuture();
        ...function compute...
        future.whenComplete(() -> {
            ...  call external system  ...
        });
        return future;
    }
```

### Modifications
- add support for Async Functions support.

### Verifying this change
current ut passed.



* support func: Function<I, CompletableFuture<O>>

* add 2 examples

* add limit to the max outstanding items
  • Loading branch information
jiazhai authored May 8, 2020
1 parent b9e9609 commit 7cd28b9
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,7 @@ public enum Runtime {
// to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
// interface
private String customRuntimeOptions;
// Max pending async requests per instance to avoid large number of concurrent requests.
// Only used in AsyncFunction. Default: 1000.
private Integer maxPendingAsyncRequests = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class InstanceConfig {
private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
private int port;
private String clusterName;
// Max pending async requests per instance to avoid large number of concurrent requests.
// Only used in AsyncFunction. Default: 1000
private int maxPendingAsyncRequests = 1000;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.pulsar.functions.instance;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -40,9 +45,18 @@ public class JavaInstance implements AutoCloseable {
private Function function;
private java.util.function.Function javaUtilFunction;

public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
// for Async function max out standing items
private final InstanceConfig instanceConfig;
private final Executor executor;
@Getter
private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;

public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {

this.context = contextImpl;
this.instanceConfig = instanceConfig;
this.executor = Executors.newSingleThreadExecutor();
this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());

// create the functions
if (userClassObject instanceof Function) {
Expand All @@ -52,23 +66,63 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
}
}

public JavaExecutionResult handleMessage(Record<?> record, Object input) {
public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
if (context != null) {
context.setCurrentMessageContext(record);
}

final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
JavaExecutionResult executionResult = new JavaExecutionResult();

final Object output;

try {
Object output;
if (function != null) {
output = function.process(input, context);
} else {
output = javaUtilFunction.apply(input);
}
executionResult.setResult(output);
} catch (Exception ex) {
executionResult.setUserException(ex);
future.complete(executionResult);
return future;
}

if (output instanceof CompletableFuture) {
// Function is in format: Function<I, CompletableFuture<O>>
try {
pendingAsyncRequests.put((CompletableFuture) output);
} catch (InterruptedException ie) {
log.warn("Exception while put Async requests", ie);
executionResult.setUserException(ie);
future.complete(executionResult);
return future;
}

((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> {
if (log.isDebugEnabled()) {
log.debug("Got result async: object: {}, throwable: {}", obj, throwable);
}

if (throwable != null) {
executionResult.setUserException(new Exception((Throwable)throwable));
pendingAsyncRequests.remove(output);
future.complete(executionResult);
return;
}
executionResult.setResult(obj);
pendingAsyncRequests.remove(output);
future.complete(executionResult);
}, executor);
} else {
if (log.isDebugEnabled()) {
log.debug("Got result: object: {}", output);
}
executionResult.setResult(output);
future.complete(executionResult);
}
return executionResult;

return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -214,7 +215,7 @@ JavaInstance setupJavaInstance() throws Exception {
// start any log topic handler
setupLogHandler();

return new JavaInstance(contextImpl, object);
return new JavaInstance(contextImpl, object, instanceConfig);
}

ContextImpl setupContext() {
Expand Down Expand Up @@ -254,7 +255,7 @@ public void run() {
}

addLogTopicHandler();
JavaExecutionResult result;
CompletableFuture<JavaExecutionResult> result;

// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());
Expand All @@ -272,10 +273,6 @@ public void run() {

removeLogTopicHandler();

if (log.isDebugEnabled()) {
log.debug("Got result: {}", result.getResult());
}

try {
processResult(currentRecord, result);
} catch (Exception e) {
Expand Down Expand Up @@ -415,23 +412,27 @@ private void setupStateTable() throws Exception {
}

private void processResult(Record srcRecord,
JavaExecutionResult result) throws Exception {
if (result.getUserException() != null) {
log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException());
stats.incrUserExceptions(result.getUserException());
srcRecord.fail();
} else {
if (result.getResult() != null) {
sendOutputMessage(srcRecord, result.getResult());
CompletableFuture<JavaExecutionResult> result) throws Exception {
result.whenComplete((result1, throwable) -> {
if (throwable != null || result1.getUserException() != null) {
Throwable t = throwable != null ? throwable : result1.getUserException();
log.warn("Encountered exception when processing message {}",
srcRecord, t);
stats.incrUserExceptions(t);
srcRecord.fail();
} else {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
// the function doesn't produce any result or the user doesn't want the result.
srcRecord.ack();
if (result1.getResult() != null) {
sendOutputMessage(srcRecord, result1.getResult());
} else {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
// the function doesn't produce any result or the user doesn't want the result.
srcRecord.ack();
}
}
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
});
}

private void sendOutputMessage(Record srcRecord, Object output) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,109 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;

@Slf4j
public class JavaInstanceTest {

/**
* Verify that be able to run lambda functions.
* @throws Exception
*/
@Test
public void testLambda() {
public void testLambda() throws Exception {
JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
(Function<String, String>) (input, context) -> input + "-lambda");
mock(ContextImpl.class),
(Function<String, String>) (input, context) -> input + "-lambda",
new InstanceConfig());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
assertNotNull(result.getResult());
assertEquals(new String(testString + "-lambda"), result.getResult());
CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
assertNotNull(result.get().getResult());
assertEquals(new String(testString + "-lambda"), result.get().getResult());
instance.close();
}

@Test
public void testAsyncFunction() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(500);
result.complete(String.format("%s-lambda", input));
} catch (Exception e) {
result.completeExceptionally(e);
}
});

return result;
};

JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
function,
instanceConfig);
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
assertNotNull(result.get().getResult());
assertEquals(new String(testString + "-lambda"), result.get().getResult());
instance.close();
}

@Test
public void testAsyncFunctionMaxPending() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();
int pendingQueueSize = 2;
instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(500);
result.complete(String.format("%s-lambda", input));
} catch (Exception e) {
result.completeExceptionally(e);
}
});

return result;
};

JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
function,
instanceConfig);
String testString = "ABC123";

long startTime = System.currentTimeMillis();
assertEquals(pendingQueueSize, instance.getPendingAsyncRequests().remainingCapacity());
CompletableFuture<JavaExecutionResult> result1 = instance.handleMessage(mock(Record.class), testString);
assertEquals(pendingQueueSize - 1, instance.getPendingAsyncRequests().remainingCapacity());
CompletableFuture<JavaExecutionResult> result2 = instance.handleMessage(mock(Record.class), testString);
assertEquals(pendingQueueSize - 2, instance.getPendingAsyncRequests().remainingCapacity());
CompletableFuture<JavaExecutionResult> result3 = instance.handleMessage(mock(Record.class), testString);
// no space left
assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity());

instance.getPendingAsyncRequests().remainingCapacity();
assertNotNull(result1.get().getResult());
assertNotNull(result2.get().getResult());
assertNotNull(result3.get().getResult());

assertEquals(new String(testString + "-lambda"), result1.get().getResult());
long endTime = System.currentTimeMillis();

log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

public class AsyncContextFunction implements Function<String, CompletableFuture<Void>> {
@Override
public CompletableFuture<Void> process(String input, Context context) {
Logger LOG = context.getLogger();
CompletableFuture<Void> future = new CompletableFuture();

// this method only delay a function execute.
Executors.newCachedThreadPool().submit(() -> {
try {
Thread.sleep(500);
} catch (Exception e) {
LOG.error("Exception when Thread.sleep", e);
future.completeExceptionally(e);
}

String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
String funcName = context.getFunctionName();

String logMessage = String
.format("A message with value of \"%s\" has arrived on one of the following topics: %s\n",
input, inputTopics);
LOG.info(logMessage);

String metricName = String.format("function-%s-messages-received", funcName);
context.recordMetric(metricName, 1);

future.complete(null);
});

return future;
}
}
Loading

0 comments on commit 7cd28b9

Please sign in to comment.