Skip to content

Commit

Permalink
Optimize built-in source/sink startup Part 2 (apache#9500)
Browse files Browse the repository at this point in the history
* Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation Part 2. Allow ThreadRuntime to used cached built-in connectors instead of unpacking and loading again.

Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Feb 8, 2021
1 parent f4f31e2 commit c27f7a3
Show file tree
Hide file tree
Showing 23 changed files with 155 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
Expand All @@ -85,8 +87,6 @@
public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;

// input topic consumer & output topic producer
private final PulsarClientImpl client;
Expand Down Expand Up @@ -124,7 +124,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
private String narExtractionDirectory;

// a flog to determine if member variables have been initialized as part of setup().
// used for out of band API calls like operations involving stats
Expand All @@ -134,23 +133,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private ReadWriteLock statsLock = new ReentrantReadWriteLock();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
String narExtractionDirectory) {
ClassLoader functionClassLoader) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
this.functionClassLoader = functionClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -197,9 +192,6 @@ synchronized private void setup() throws Exception {
log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());

// start the function thread
functionClassLoader = loadJars();

Object object;
if (instanceConfig.getFunctionDetails().getClassName().equals(org.apache.pulsar.functions.windowing.WindowFunctionExecutor.class.getName())) {
object = Reflections.createInstance(
Expand Down Expand Up @@ -305,35 +297,6 @@ public void run() {
}
}

private ClassLoader loadJars() throws Exception {
ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
jarFile, narExtractionDirectory);
} catch (FileNotFoundException e) {
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
Arrays.asList(jarFile),
Collections.emptyList());
}

log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));

fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}

return fnClassLoader;
}

private void setupStateStore() throws Exception {
this.stateManager = new InstanceStateManager();

Expand Down Expand Up @@ -475,14 +438,7 @@ synchronized public void close() {
stateStoreProvider.close();
}

if (instanceCache != null) {
// once the thread quits, clean up the instance
fnCache.unregisterFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName());
log.info("Unloading JAR files for function {}", instanceConfig);
instanceCache = null;
}
instanceCache = null;

if (logAppender != null) {
removeLogTopicAppender(LoggerContext.getContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static InstanceConfig createInstanceConfig(String outputSerde) {
private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
config, null, null, null, null, null, null, null, null);
config, null, null, null, null, null, null);
return javaInstanceRunnable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;

import java.util.Optional;
Expand All @@ -37,6 +38,7 @@ public interface RuntimeFactory extends AutoCloseable {
void initialize(WorkerConfig workerConfig,
AuthenticationConfig authenticationConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> authProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -129,6 +130,7 @@ public boolean externallyManaged() {
@Override
public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> functionAuthProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;

import java.nio.file.Paths;
Expand Down Expand Up @@ -94,6 +95,7 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
@Override
public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> authProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) {
ProcessRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

package org.apache.pulsar.functions.runtime.thread;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.prometheus.client.CollectorRegistry;
Expand All @@ -28,6 +33,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
Expand All @@ -36,6 +42,8 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.ConnectorsManager;

/**
* A function container implemented using java thread.
Expand All @@ -60,6 +68,8 @@ public class ThreadRuntime implements Runtime {
private SecretsProvider secretsProvider;
private CollectorRegistry collectorRegistry;
private String narExtractionDirectory;
private final Optional<ConnectorsManager> connectorsManager;

ThreadRuntime(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
Expand All @@ -69,7 +79,8 @@ public class ThreadRuntime implements Runtime {
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
String narExtractionDirectory) {
String narExtractionDirectory,
Optional<ConnectorsManager> connectorsManager) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java Runtime");
Expand All @@ -84,34 +95,82 @@ public class ThreadRuntime implements Runtime {
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
fnCache,
jarFile,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
secretsProvider,
collectorRegistry,
narExtractionDirectory);
this.connectorsManager = connectorsManager;
}

private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
String jarFile,
String narExtractionDirectory,
FunctionCacheManager fnCache,
Optional<ConnectorsManager> connectorsManager) throws Exception {

if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())
&& connectorsManager.isPresent()) {
switch (InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) {
case SOURCE:
return connectorsManager.get().getConnector(
instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
case SINK:
return connectorsManager.get().getConnector(
instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
default:
return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
}
} else {
return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
}
}

private static ClassLoader loadJars(String jarFile,
InstanceConfig instanceConfig,
String narExtractionDirectory,
FunctionCacheManager fnCache) throws Exception {
ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
jarFile, narExtractionDirectory);
} catch (FileNotFoundException e) {
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
Arrays.asList(jarFile),
Collections.emptyList());
}

log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));

fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}

return fnClassLoader;
}

/**
* The core logic that initialize the thread container and executes the function.
*/
@Override
public void start() {
public void start() throws Exception {

// extract class loader for function
ClassLoader functionClassLoader = getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory, fnCache, connectorsManager);

// re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
fnCache,
jarFile,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
secretsProvider,
collectorRegistry,
narExtractionDirectory);
functionClassLoader);
log.info("ThreadContainer starting function with instance config {}", instanceConfig);
this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
String.format("%s-%s",
Expand Down Expand Up @@ -145,6 +204,12 @@ public void stop() {
}
// make sure JavaInstanceRunnable is closed
this.javaInstanceRunnable.close();

log.info("Unloading JAR files for function {}", instanceConfig);
// once the thread quits, clean up the instance
fnCache.unregisterFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName());
}
}

Expand Down
Loading

0 comments on commit c27f7a3

Please sign in to comment.