Skip to content

Commit

Permalink
Optimize batch source discovery and task ack (apache#8498)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Nov 10, 2020
1 parent 853bcf9 commit 5c756ee
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private void sendOutputMessage(Record srcRecord, Object output) {
}
}

private Record readInput() {
private Record readInput() throws Exception {
Record record;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
Expand All @@ -469,8 +469,8 @@ private Record readInput() {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
log.error("Encountered exception in source read", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.source.batch;

import com.google.gson.Gson;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -41,6 +42,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* BatchSourceExecutor wraps BatchSource as Source. Thus from Pulsar IO perspective, it is running a regular
Expand All @@ -63,13 +67,16 @@ public class BatchSourceExecutor<T> implements Source<T> {
private String batchSourceClassName;
private BatchSource<T> batchSource;
private String intermediateTopicName;
private volatile Exception currentError = null;
private volatile boolean isRunning = false;
private ExecutorService discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-source-discovery"));

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.config = config;
this.sourceContext = sourceContext;
this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(),
sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
this.getBatchSourceConfigs(config);
this.initializeBatchSource();
this.start();
Expand All @@ -78,14 +85,21 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
@Override
public Record<T> read() throws Exception {
while (true) {
if (currentError != null) {
throw currentError;
}
if (currentTask == null) {
retrieveNextTask();
prepareInternal();
currentTask = retrieveNextTask();
prepareInternal(currentTask);
}
Record<T> retval = batchSource.readNext();
if (retval == null) {
// signals end if this batch
intermediateTopicConsumer.acknowledge(currentTask.getMessageId());
intermediateTopicConsumer.acknowledgeAsync(currentTask.getMessageId()).exceptionally(throwable -> {
log.error("Encountered error when acknowledging completed task with id {}", currentTask.getMessageId(), throwable);
setCurrentError(throwable);
return null;
});
currentTask = null;
} else {
return retval;
Expand All @@ -95,7 +109,7 @@ public Record<T> read() throws Exception {

private void getBatchSourceConfigs(Map<String, Object> config) {
if (!config.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY)
|| !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
|| !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) {
throw new IllegalArgumentException("Batch Configs cannot be found");
}

Expand All @@ -108,18 +122,18 @@ private void initializeBatchSource() {
// First init the batchsource
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object userClassObject = Reflections.createInstance(
batchSourceClassName,
clsLoader);
batchSourceClassName,
clsLoader);
if (userClassObject instanceof BatchSource) {
batchSource = (BatchSource) userClassObject;
batchSource = (BatchSource) userClassObject;
} else {
throw new IllegalArgumentException("BatchSource does not implement the correct interface");
}

// next init the discovery triggerer
Object discoveryClassObject = Reflections.createInstance(
batchSourceConfig.getDiscoveryTriggererClassName(),
clsLoader);
batchSourceConfig.getDiscoveryTriggererClassName(),
clsLoader);
if (discoveryClassObject instanceof BatchSourceTriggerer) {
discoveryTriggerer = (BatchSourceTriggerer) discoveryClassObject;
} else {
Expand All @@ -128,22 +142,38 @@ private void initializeBatchSource() {
}

private void start() throws Exception {
isRunning = true;
createIntermediateTopicConsumer();
batchSource.open(this.config, this.sourceContext);
if (sourceContext.getInstanceId() == 0) {
discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(),
this.sourceContext);
this.sourceContext);
discoveryTriggerer.start(this::triggerDiscover);
}
}

private void triggerDiscover(String discoveredEvent) {
try {
batchSource.discover((task) -> this.taskEater(discoveredEvent, task));
} catch (Exception e) {
log.error("Error on discover", e);
throw new RuntimeException(e);
volatile boolean discoverInProgress = false;
private synchronized void triggerDiscover(String discoveredEvent) {

if (discoverInProgress) {
log.info("Discovery is already in progress");
return;
} else {
discoverInProgress = true;
}
// Run this code asynchronous so it doesn't block processing of the tasks
discoveryThread.submit(() -> {
try {
batchSource.discover(task -> taskEater(discoveredEvent, task));
} catch (Exception e) {
if (isRunning || !(e instanceof InterruptedException)) {
log.error("Encountered error during task discovery", e);
setCurrentError(e);
}
} finally {
discoverInProgress = false;
}
});
}

private void taskEater(String discoveredEvent, byte[] task) {
Expand All @@ -153,16 +183,17 @@ private void taskEater(String discoveredEvent, byte[] task) {
properties.put("produceTime", String.valueOf(System.currentTimeMillis()));
TypedMessageBuilder<byte[]> message = sourceContext.newOutputMessage(intermediateTopicName, Schema.BYTES);
message.value(task).properties(properties);
// Note: we can only make this send async if the api returns a future to the connector so that errors can be handled by the connector
message.send();
} catch (Exception e) {
log.error("error writing discovered task to intermediate topic", e);
throw new RuntimeException("error writing discovered task to intermediate topic");
}
}

private void prepareInternal() {
private void prepareInternal(Message<byte[]> task) {
try {
batchSource.prepare(currentTask.getValue());
batchSource.prepare(task.getValue());
} catch (Exception e) {
log.error("Error on prepare", e);
throw new RuntimeException(e);
Expand All @@ -175,6 +206,7 @@ public void close() throws Exception {
}

private void stop() throws Exception {
isRunning = false;
Exception ex = null;
if (discoveryTriggerer != null) {
try {
Expand All @@ -185,6 +217,14 @@ private void stop() throws Exception {
}
discoveryTriggerer = null;
}

discoveryThread.shutdownNow();
try {
discoveryThread.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Shutdown of discovery thread was interrupted");
}

if (intermediateTopicConsumer != null) {
try {
intermediateTopicConsumer.close();
Expand Down Expand Up @@ -255,11 +295,24 @@ private void createIntermediateTopicConsumer() {
}
}


private void retrieveNextTask() throws Exception {
currentTask = intermediateTopicConsumer.receive();
return;
private Message<byte[]> retrieveNextTask() throws Exception {
while(true) {
if (currentError != null) {
throw currentError;
}
Message<byte[]> taskMessage = intermediateTopicConsumer.receive(5, TimeUnit.SECONDS);
if (taskMessage != null) {
return taskMessage;
}
}
}

private void setCurrentError(Throwable error) {
if (error instanceof Exception) {
currentError = (Exception) error;
} else {
currentError = new RuntimeException(error.getCause());
}
}
}

Loading

0 comments on commit 5c756ee

Please sign in to comment.