Skip to content

Commit

Permalink
[Functions] Process async results in the same Java runnable thread (a…
Browse files Browse the repository at this point in the history
…pache#10618)

* [Functions] Process async results in the same Java runnable thread

*Motivation*

After introducing the support for async functions, the java function processing semantic is not enforced.
For example, if it fails to write a sink, it doesn't fail the java instance or fail the message. Hence it keeps
receiving messages but never ack or nack.

*Modification*

Change the way how aysnc function requests are processed to fix the issues we have seen in Kinesis connector.

* Process the async results in the different thread
  • Loading branch information
sijie authored May 18, 2021
1 parent 8183a1e commit 1cbd5b4
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
*/
package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

import java.util.Map;

/**
* This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient
* program if invoking via a process based invocation or using JavaInstance using a thread
Expand All @@ -40,6 +43,12 @@
@Slf4j
public class JavaInstance implements AutoCloseable {

@Data
public static class AsyncFuncRequest {
private final Record record;
private final CompletableFuture processResult;
}

@Getter(AccessLevel.PACKAGE)
private final ContextImpl context;
private Function function;
Expand All @@ -49,7 +58,7 @@ public class JavaInstance implements AutoCloseable {
private final InstanceConfig instanceConfig;
private final ExecutorService executor;
@Getter
private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;
private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests;

public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {

Expand All @@ -66,12 +75,18 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
}
}

public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
@VisibleForTesting
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
return handleMessage(record, input, (rec, result) -> {}, cause -> {});
}

public JavaExecutionResult handleMessage(Record<?> record, Object input,
BiConsumer<Record, JavaExecutionResult> asyncResultConsumer,
Consumer<Throwable> asyncFailureHandler) {
if (context != null) {
context.setCurrentMessageContext(record);
}

final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
JavaExecutionResult executionResult = new JavaExecutionResult();

final Object output;
Expand All @@ -84,45 +99,63 @@ public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Ob
}
} catch (Exception ex) {
executionResult.setUserException(ex);
future.complete(executionResult);
return future;
return executionResult;
}

if (output instanceof CompletableFuture) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
);
try {
pendingAsyncRequests.put((CompletableFuture) output);
pendingAsyncRequests.put(request);
((CompletableFuture) output).whenCompleteAsync((res, cause) -> {
try {
processAsyncResults(asyncResultConsumer);
} catch (Throwable innerException) {
// the thread used for processing async results failed
asyncFailureHandler.accept(innerException);
}
}, executor);
return null;
} catch (InterruptedException ie) {
log.warn("Exception while put Async requests", ie);
executionResult.setUserException(ie);
future.complete(executionResult);
return future;
return executionResult;
}

((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> {
if (log.isDebugEnabled()) {
log.debug("Got result async: object: {}, throwable: {}", obj, throwable);
}

if (throwable != null) {
executionResult.setUserException(new Exception((Throwable)throwable));
pendingAsyncRequests.remove(output);
future.complete(executionResult);
return;
}
executionResult.setResult(obj);
pendingAsyncRequests.remove(output);
future.complete(executionResult);
}, executor);
} else {
if (log.isDebugEnabled()) {
log.debug("Got result: object: {}", output);
}
executionResult.setResult(output);
future.complete(executionResult);
return executionResult;
}
}

private void processAsyncResults(BiConsumer<Record, JavaExecutionResult> resultConsumer)
throws InterruptedException {
AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);
JavaExecutionResult execResult = new JavaExecutionResult();

try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
execResult.setUserException((Exception) e.getCause());
} else {
execResult.setUserException(new Exception(e.getCause()));
}
}

resultConsumer.accept(asyncResult.getRecord(), execResult);

// peek the next result
asyncResult = pendingAsyncRequests.peek();
}

return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.Getter;
Expand All @@ -51,6 +51,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.instance.JavaInstance.AsyncFuncRequest;
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.state.StateManager;
Expand Down Expand Up @@ -241,6 +242,8 @@ public void run() {
try {
setup();

Thread currentThread = Thread.currentThread();

while (true) {
currentRecord = readInput();

Expand All @@ -255,7 +258,7 @@ public void run() {
}

addLogTopicHandler();
CompletableFuture<JavaExecutionResult> result;
JavaExecutionResult result;

// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());
Expand All @@ -265,19 +268,19 @@ public void run() {

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
result = javaInstance.handleMessage(
currentRecord, currentRecord.getValue(), this::handleResult,
cause -> currentThread.interrupt());
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();

removeLogTopicHandler();

try {
processResult(currentRecord, result);
} catch (Exception e) {
log.warn("Failed to process result of message {}", currentRecord, e);
currentRecord.fail();
if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -319,28 +322,29 @@ private void setupStateStore() throws Exception {
}
}

private void processResult(Record srcRecord,
CompletableFuture<JavaExecutionResult> result) throws Exception {
result.whenComplete((result1, throwable) -> {
if (throwable != null || result1.getUserException() != null) {
Throwable t = throwable != null ? throwable : result1.getUserException();
log.warn("Encountered exception when processing message {}",
srcRecord, t);
stats.incrUserExceptions(t);
srcRecord.fail();
private void processAsyncResults() throws InterruptedException {

}

private void handleResult(Record srcRecord, JavaExecutionResult result) {
if (result.getUserException() != null) {
Exception t = result.getUserException();
log.warn("Encountered exception when processing message {}",
srcRecord, t);
stats.incrUserExceptions(t);
srcRecord.fail();
} else {
if (result.getResult() != null) {
sendOutputMessage(srcRecord, result.getResult());
} else {
if (result1.getResult() != null) {
sendOutputMessage(srcRecord, result1.getResult());
} else {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
// the function doesn't produce any result or the user doesn't want the result.
srcRecord.ack();
}
if (instanceConfig.getFunctionDetails().getAutoAck()) {
// the function doesn't produce any result or the user doesn't want the result.
srcRecord.ack();
}
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
});
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
}

private void sendOutputMessage(Record srcRecord, Object output) {
Expand All @@ -352,7 +356,8 @@ private void sendOutputMessage(Record srcRecord, Object output) {
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
// fail the source record
srcRecord.fail();
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
Expand Down
Loading

0 comments on commit 1cbd5b4

Please sign in to comment.