Skip to content

Commit

Permalink
[Functions] Fix classloader leaks (apache#12973)
Browse files Browse the repository at this point in the history
* Fix classloader leak in FunctionCommon.getClassLoaderFromPackage

* Fix classloader leak in SinksImpl and SourcesImpl

* Fix logic for shouldCloseClassLoader
  • Loading branch information
lhotari authored Dec 3, 2021
1 parent de51284 commit cab946b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
*/
package org.apache.pulsar.common.util;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.AccessController;
import java.security.PrivilegedAction;
import lombok.extern.slf4j.Slf4j;

/**
* Helper methods wrt Classloading.
*/
@Slf4j
public class ClassLoaderUtils {
/**
* Load a jar.
Expand Down Expand Up @@ -76,4 +80,14 @@ public static void implementsClass(String className, Class<?> klass, ClassLoader
String.format("%s does not implement %s", className, klass.getName()));
}
}

public static void closeClassLoader(ClassLoader classLoader) {
if (classLoader instanceof Closeable) {
try {
((Closeable) classLoader).close();
} catch (IOException e) {
log.error("Error closing classloader {}", classLoader, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,97 +382,114 @@ public static ClassLoader getClassLoaderFromPackage(
String narExtractionDirectory) {
String connectorClassName = className;
ClassLoader jarClassLoader = null;
boolean keepJarClassLoader = false;
ClassLoader narClassLoader = null;
boolean keepNarClassLoader = false;

Exception jarClassLoaderException = null;
Exception narClassLoaderException = null;

try {
jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
try {
narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
} catch (Exception e) {
narClassLoaderException = e;
}

// if connector class name is not provided, we can only try to load archive as a NAR
if (isEmpty(connectorClassName)) {
if (narClassLoader == null) {
throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
"Pulsar cannot determine if the package is a NAR package or JAR package. " +
"%s classname is not provided and attempts to load it as a NAR package produced the following error.",
capFirstLetter(componentType), capFirstLetter(componentType)),
narClassLoaderException);
}
try {
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} else {
connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
componentType.toString().toLowerCase()), e);
jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}

try {
narClassLoader.loadClass(connectorClassName);
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path", capFirstLetter(componentType), connectorClassName), e);
narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
} catch (Exception e) {
narClassLoaderException = e;
}

} else {
// if connector class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
// if connector class name is not provided, we can only try to load archive as a NAR
if (isEmpty(connectorClassName)) {
if (narClassLoader == null) {
throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
"Pulsar cannot determine if the package is a NAR package or JAR package. " +
"%s classname is not provided and attempts to load it as a NAR package produced " +
"the following error.",
capFirstLetter(componentType), capFirstLetter(componentType)),
narClassLoaderException);
}
try {
jarClassLoader.loadClass(connectorClassName);
return jarClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {

try {
narClassLoader.loadClass(connectorClassName);
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e1);
}
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} else {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path", capFirstLetter(componentType),
connectorClassName), e);
connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
componentType.toString().toLowerCase()), e);
}
} else if (narClassLoader != null) {

try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e1);
String.format("%s class %s must be in class path", capFirstLetter(componentType),
connectorClassName), e);
}

} else {
StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
+ " package does not have the correct format."
+ " Pulsar cannot determine if the package is a NAR package or JAR package.");
// if connector class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
try {
jarClassLoader.loadClass(connectorClassName);
keepJarClassLoader = true;
return jarClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {

try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e1);
}
} else {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path", capFirstLetter(componentType),
connectorClassName), e);
}
}
} else if (narClassLoader != null) {
try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e1);
}
} else {
StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
+ " package does not have the correct format."
+ " Pulsar cannot determine if the package is a NAR package or JAR package.");

if (jarClassLoaderException != null) {
errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
}
if (jarClassLoaderException != null) {
errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
}

if (narClassLoaderException != null) {
errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
}
if (narClassLoaderException != null) {
errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
}

throw new IllegalArgumentException(errorMsg.toString());
throw new IllegalArgumentException(errorMsg.toString());
}
}
} finally {
if (!keepJarClassLoader) {
ClassLoaderUtils.closeClassLoader(jarClassLoader);
}
if (!keepNarClassLoader) {
ClassLoaderUtils.closeClassLoader(narClassLoader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
Expand Down Expand Up @@ -732,19 +733,28 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
}
}

// if sink is not builtin, attempt to extract classloader from package file if it exists
if (classLoader == null && sinkPackageFile != null) {
classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
}
boolean shouldCloseClassLoader = false;
try {

if (classLoader == null) {
throw new IllegalArgumentException("Sink package is not provided");
}
// if sink is not builtin, attempt to extract classloader from package file if it exists
if (classLoader == null && sinkPackageFile != null) {
classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(),
sinkPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
shouldCloseClassLoader = true;
}

SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
if (classLoader == null) {
throw new IllegalArgumentException("Sink package is not provided");
}

SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
} finally {
if (shouldCloseClassLoader) {
ClassLoaderUtils.closeClassLoader(classLoader);
}
}
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
Expand Down Expand Up @@ -728,20 +729,28 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
}
}

// if source is not builtin, attempt to extract classloader from package file if it exists
if (classLoader == null && sourcePackageFile != null) {
classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
}
boolean shouldCloseClassLoader = false;
try {
// if source is not builtin, attempt to extract classloader from package file if it exists
if (classLoader == null && sourcePackageFile != null) {
classLoader = getClassLoaderFromPackage(sourceConfig.getClassName(),
sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
shouldCloseClassLoader = true;
}

if (classLoader == null) {
throw new IllegalArgumentException("Source package is not provided");
}
if (classLoader == null) {
throw new IllegalArgumentException("Source package is not provided");
}

SourceConfigUtils.ExtractedSourceDetails sourceDetails
= SourceConfigUtils.validateAndExtractDetails(
sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
SourceConfigUtils.ExtractedSourceDetails sourceDetails
= SourceConfigUtils.validateAndExtractDetails(
sourceConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
} finally {
if (shouldCloseClassLoader) {
ClassLoaderUtils.closeClassLoader(classLoader);
}
}
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
Expand Down

0 comments on commit cab946b

Please sign in to comment.