Skip to content

Commit

Permalink
Use classloaders to load Java functions (apache#4685)
Browse files Browse the repository at this point in the history
* Use classloading to load use code for functions
  • Loading branch information
jerrypeng authored Jul 13, 2019
1 parent d2164eb commit 6ff1bba
Show file tree
Hide file tree
Showing 27 changed files with 605 additions and 646 deletions.
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"


ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"

Expand Down
6 changes: 0 additions & 6 deletions distribution/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@
<version>${project.version}</version>
<!-- make sure the api examples are compiled before assembly -->
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- local-runner -->
Expand Down
2 changes: 2 additions & 0 deletions distribution/server/src/assemble/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@
<exclude>io.netty:netty-transport-native-epoll</exclude>
<exclude>io.netty:netty-transport-native-unix-common</exclude>

<exclude>org.apache.pulsar:pulsar-functions-runtime-all</exclude>

<!-- Already included in pulsar-zookeeper instrumented jar -->
<exclude>org.apache.zookeeper:zookeeper</exclude>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ static <T> T catchExceptions(SupplierWithException<T> s) {
@SuppressWarnings("unchecked")
static <T> Class<T> newClassInstance(String className) {
try {
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
try {
// when the API is loaded in the same classloader as the impl
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
} catch (Exception e) {
// when the API is loaded in a separate classloader as the impl
// the classloader that loaded the impl needs to be a child classloader of the classloader
// that loaded the API
return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(className);
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@
@Slf4j
public class JavaInstanceRunnable implements AutoCloseable, Runnable {

// The class loader that used for loading functions
private ClassLoader fnClassLoader;
private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;
Expand Down Expand Up @@ -132,6 +130,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final Map<String, String> properties;

private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
Expand Down Expand Up @@ -166,12 +167,14 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
this.collectorRegistry = collectorRegistry;

this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
}

/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
JavaInstance setupJavaInstance() throws Exception {
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
Expand All @@ -181,18 +184,21 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());

// start the function thread
loadJars();
functionClassLoader = loadJars();

ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
clsLoader);
functionClassLoader);

if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}

// start the state table
setupStateTable();

ContextImpl contextImpl = setupContext();

// start the output producer
setupOutput(contextImpl);
// start the input consumer
Expand Down Expand Up @@ -225,8 +231,7 @@ public void run() {
this.instanceCache.getScheduledExecutorService(),
this.componentType);

ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
Expand Down Expand Up @@ -254,7 +259,9 @@ public void run() {
stats.processTimeStart();

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();
Expand Down Expand Up @@ -289,7 +296,8 @@ public void run() {
}
}

private void loadJars() throws Exception {
private ClassLoader loadJars() throws Exception {
ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
Expand All @@ -309,13 +317,12 @@ private void loadJars() throws Exception {
log.info("Initialize function class loader for function {} at function cache manager",
instanceConfig.getFunctionDetails().getName());

this.fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}

// make sure the function class loader is accessible thread-locally
Thread.currentThread().setContextClassLoader(fnClassLoader);
return fnClassLoader;
}

private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
Expand Down Expand Up @@ -425,23 +432,33 @@ private void processResult(Record srcRecord,
}

private void sendOutputMessage(Record srcRecord, Object output) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
}

private Record readInput() {
Record record;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}

// check record is valid
Expand All @@ -466,19 +483,29 @@ synchronized public void close() {
}

if (source != null) {
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
source = null;
}

if (sink != null) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
sink.close();
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
sink = null;
}
Expand Down Expand Up @@ -667,11 +694,11 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);
}

Class<?>[] typeArgs;
Expand All @@ -683,11 +710,22 @@ public void setupInput(ContextImpl contextImpl) throws Exception {
}
this.source = (Source<?>) object;

if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Source open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}

Expand All @@ -713,24 +751,36 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {

pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());

object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats);
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
}
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);
}

if (object instanceof Sink) {
this.sink = (Sink) object;
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
if (sinkSpec.getConfigs().isEmpty()) {
this.sink.open(new HashMap<>(), contextImpl);
} else {
this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {}.getType()), contextImpl);

if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
this.sink.open(new HashMap<>(), contextImpl);
} else {
this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> {
private final PulsarClient client;
private final PulsarSinkConfig pulsarSinkConfig;
private final Map<String, String> properties;
private final ClassLoader functionClassLoader;
private ComponentStatsManager stats;

@VisibleForTesting
Expand Down Expand Up @@ -237,12 +238,14 @@ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
}
}

public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) {
public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties,
ComponentStatsManager stats, ClassLoader functionClassLoader) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client);
this.properties = properties;
this.stats = stats;
this.functionClassLoader = functionClassLoader;
}

@Override
Expand Down Expand Up @@ -314,9 +317,7 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
return (Schema<T>) Schema.BYTES;
}

Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(),
Thread.currentThread().getContextClassLoader());

Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
if (Void.class.equals(typeArg)) {
// return type is 'void', so there's no schema to check
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final PulsarClient pulsarClient;
private final PulsarSourceConfig pulsarSourceConfig;
private final Map<String, String> properties;
private final ClassLoader functionClassLoader;
private List<String> inputTopics;
private List<Consumer<T>> inputConsumers = Collections.emptyList();
private final TopicSchema topicSchema;

public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
ClassLoader functionClassLoader) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient);
this.properties = properties;
this.functionClassLoader = functionClassLoader;
}

@Override
Expand Down Expand Up @@ -147,7 +150,7 @@ Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundExcept
Map<String, ConsumerConfig<T>> configs = new TreeMap<>();

Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);

checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");

Expand Down
Loading

0 comments on commit 6ff1bba

Please sign in to comment.