Skip to content

Commit

Permalink
[FLINK-27658][runtime] SafetyNetWrapperClassLoader expose addURL meth…
Browse files Browse the repository at this point in the history
…od to allow registering jar dynamically
  • Loading branch information
lsyldliu authored and zhuzhurk committed Jun 14, 2022
1 parent d389a47 commit 2e37a06
Show file tree
Hide file tree
Showing 22 changed files with 247 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.SupplierWithException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FlinkUserCodeClassLoaders.ParentFirstClassLoader;

import org.apache.commons.cli.Options;
import org.junit.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.util;

import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.annotation.Internal;

import java.io.File;
import java.io.IOException;
Expand All @@ -30,6 +30,7 @@
* Utilities for information with respect to class loaders, specifically class loaders for the
* dynamic loading of user defined classes.
*/
@Internal
public final class ClassLoaderUtil {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolv
throws ClassNotFoundException {
return super.loadClass(name, resolve);
}

@Override
protected void addURL(URL url) {
super.addURL(url);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
* limitations under the License.
*/

package org.apache.flink.runtime.execution.librarycache;
package org.apache.flink.util;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FlinkUserCodeClassLoader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +32,7 @@
import java.util.function.Consumer;

/** Gives the URLClassLoader a nicer name for debugging purposes. */
@Internal
public class FlinkUserCodeClassLoaders {

private FlinkUserCodeClassLoaders() {}
Expand Down Expand Up @@ -110,6 +110,7 @@ public static ResolveOrder fromString(String resolveOrder) {
/**
* Regular URLClassLoader that first loads from the parent and only after that from the URLs.
*/
@Internal
public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {

ParentFirstClassLoader(
Expand All @@ -130,7 +131,8 @@ public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
* delegate is nulled and can be garbage collected. Additional class resolution will be resolved
* solely through the bootstrap classloader and most likely result in ClassNotFound exceptions.
*/
private static class SafetyNetWrapperClassLoader extends URLClassLoader implements Closeable {
@Internal
public static class SafetyNetWrapperClassLoader extends URLClassLoader implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);

Expand Down Expand Up @@ -178,6 +180,11 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE
return ensureInner().loadClass(name, resolve);
}

@Override
public void addURL(URL url) {
ensureInner().addURL(url);
}

@Override
public URL getResource(String name) {
return ensureInner().getResource(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import org.apache.flink.util.IOUtils;
package org.apache.flink.util;

import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,22 @@
* limitations under the License.
*/

package org.apache.flink.runtime.execution.librarycache;

import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
package org.apache.flink.util;

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;

import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.assertj.core.api.Assertions.fail;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -53,52 +46,23 @@ public class FlinkUserCodeClassLoadersTest extends TestLogger {

@Rule public ExpectedException expectedException = ExpectedException.none();

@Test
public void testMessageDecodingWithUnavailableClass() throws Exception {
final ClassLoader systemClassLoader = getClass().getClassLoader();

final String className = "UserClass";
final URLClassLoader userClassLoader =
ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(),
className + ".java",
"import java.io.Serializable;\n"
+ "public class "
+ className
+ " implements Serializable {}");

RemoteRpcInvocation method =
new RemoteRpcInvocation(
className,
"test",
new Class<?>[] {
int.class, Class.forName(className, false, userClassLoader)
},
new Object[] {
1, Class.forName(className, false, userClassLoader).newInstance()
});

SerializedValue<RemoteRpcInvocation> serializedMethod = new SerializedValue<>(method);

expectedException.expect(ClassNotFoundException.class);
expectedException.expect(
allOf(
isA(ClassNotFoundException.class),
hasProperty(
"suppressed",
hasItemInArray(
allOf(
isA(ClassNotFoundException.class),
hasProperty(
"message",
containsString(
"Could not deserialize 1th parameter type of method test(int, ...).")))))));

RemoteRpcInvocation deserializedMethod =
serializedMethod.deserializeValue(systemClassLoader);
deserializedMethod.getMethodName();

userClassLoader.close();
public static final String USER_CLASS = "UserClass";
public static final String USER_CLASS_CODE =
"import java.io.Serializable;\n"
+ "public class "
+ USER_CLASS
+ " implements Serializable {}";

private static File userJar;

@BeforeClass
public static void prepare() throws Exception {
userJar =
UserClassLoaderJarTestUtils.createJarFile(
temporaryFolder.newFolder("test-jar"),
"test-classloader.jar",
USER_CLASS,
USER_CLASS_CODE);
}

@Test
Expand Down Expand Up @@ -292,6 +256,92 @@ public void testParallelCapable() {
assertTrue(TestParentFirstClassLoader.isParallelCapable);
}

@Test
public void testParentFirstClassLoadingByAddURL() throws Exception {
// collect the libraries / class folders with RocksDB related code: the state backend and
// RocksDB itself
final URL childCodePath = getClass().getProtectionDomain().getCodeSource().getLocation();

final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader parentClassLoader =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createChildFirstClassLoader(childCodePath, getClass().getClassLoader());
final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader childClassLoader1 =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createParentFirstClassLoader(childCodePath, parentClassLoader);
final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader childClassLoader2 =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createParentFirstClassLoader(childCodePath, parentClassLoader);

// test class loader before add user jar ulr to ClassLoader
assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
assertClassNotFoundException(USER_CLASS, false, childClassLoader2);

// only add jar url to parent ClassLoader
parentClassLoader.addURL(userJar.toURI().toURL());

// test class loader after add jar url
final Class<?> clazz1 = Class.forName(USER_CLASS, false, parentClassLoader);
final Class<?> clazz2 = Class.forName(USER_CLASS, false, childClassLoader1);
final Class<?> clazz3 = Class.forName(USER_CLASS, false, childClassLoader2);

assertEquals(clazz1, clazz2);
assertEquals(clazz1, clazz3);

parentClassLoader.close();
childClassLoader1.close();
childClassLoader2.close();
}

@Test
public void testChildFirstClassLoadingByAddURL() throws Exception {

// collect the libraries / class folders with RocksDB related code: the state backend and
// RocksDB itself
final URL childCodePath = getClass().getProtectionDomain().getCodeSource().getLocation();

final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader parentClassLoader =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createChildFirstClassLoader(childCodePath, getClass().getClassLoader());
final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader childClassLoader1 =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createChildFirstClassLoader(childCodePath, parentClassLoader);
final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader childClassLoader2 =
(FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
createChildFirstClassLoader(childCodePath, parentClassLoader);

// test class loader before add user jar ulr to ClassLoader
assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
assertClassNotFoundException(USER_CLASS, false, childClassLoader2);

// only add jar url to child ClassLoader
URL userJarURL = userJar.toURI().toURL();
childClassLoader1.addURL(userJarURL);
childClassLoader2.addURL(userJarURL);

// test class loader after add jar url
assertClassNotFoundException(USER_CLASS, false, parentClassLoader);

final Class<?> clazz1 = Class.forName(USER_CLASS, false, childClassLoader1);
final Class<?> clazz2 = Class.forName(USER_CLASS, false, childClassLoader2);

assertNotEquals(clazz1, clazz2);

parentClassLoader.close();
childClassLoader1.close();
childClassLoader2.close();
}

private void assertClassNotFoundException(
String className, boolean initialize, ClassLoader classLoader) {
try {
Class.forName(className, initialize, classLoader);
fail("Should fail.");
} catch (ClassNotFoundException e) {
}
}

private static class TestParentFirstClassLoader
extends FlinkUserCodeClassLoaders.ParentFirstClassLoader {
public static boolean isParallelCapable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.utils;

import org.apache.flink.util.FileUtils;
package org.apache.flink.util;

import javax.tools.DiagnosticCollector;
import javax.tools.JavaCompiler;
Expand All @@ -35,7 +33,12 @@
import java.util.jar.JarOutputStream;

/** Mainly used for testing classloading. */
public class TestUserClassLoaderJar {
public class UserClassLoaderJarTestUtils {

/** Private constructor to prevent instantiation. */
private UserClassLoaderJarTestUtils() {
throw new RuntimeException();
}

/** Pack the generated UDF class into a JAR and return the path of the JAR. */
public static File createJarFile(File tmpDir, String jarName, String className, String javaCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.testutils.FilteredClassLoader;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.util.FlinkUserCodeClassLoaders;

import com.esotericsoftware.kryo.Kryo;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoader;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
Expand All @@ -34,6 +33,7 @@
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;

Expand Down
Loading

0 comments on commit 2e37a06

Please sign in to comment.