Skip to content

Commit

Permalink
[FLINK-11533] [container] Find job jar on classpath
Browse files Browse the repository at this point in the history
[pr-review] Address comments
  • Loading branch information
uce committed Mar 1, 2019
1 parent 34aa3f5 commit 1f1cc86
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.container.entrypoint.JarManifestParser.JarFileWithEntryClass;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand All @@ -35,6 +37,13 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -44,9 +53,6 @@
class ClassPathJobGraphRetriever implements JobGraphRetriever {

private static final Logger LOG = LoggerFactory.getLogger(ClassPathJobGraphRetriever.class);
private static final String JAVA_CLASS_PATH = "java.class.path";
private static final String PATH_SEPARATOR = "path.separator";
private static final String DEFAULT_PATH_SEPARATOR = ":";

@Nonnull
private final JobID jobId;
Expand All @@ -60,15 +66,29 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever {
@Nullable
private final String jobClassName;

@Nonnull
private final Supplier<Iterable<File>> jarsOnClassPath;

ClassPathJobGraphRetriever(
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName) {
this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE);
}

@VisibleForTesting
ClassPathJobGraphRetriever(
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments,
@Nullable String jobClassName,
@Nonnull Supplier<Iterable<File>> jarsOnClassPath) {
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
this.jobClassName = jobClassName;
this.jarsOnClassPath = requireNonNull(jarsOnClassPath, "jarsOnClassPath");
}

@Override
Expand All @@ -91,12 +111,58 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
}

private PackagedProgram createPackagedProgram() throws FlinkException {
final String entryClass = getJobClassNameOrScanClassPath();
try {
final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
final Class<?> mainClass = getClass().getClassLoader().loadClass(entryClass);
return new PackagedProgram(mainClass, programArguments);
} catch (ClassNotFoundException | ProgramInvocationException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}

private String getJobClassNameOrScanClassPath() throws FlinkException {
if (jobClassName != null) {
return jobClassName;
}

try {
return scanClassPathForJobJar();
} catch (IOException | NoSuchElementException | IllegalArgumentException e) {
throw new FlinkException("Failed to find job JAR on class path. Please provide the job class name explicitly.", e);
}
}

private String scanClassPathForJobJar() throws IOException {
LOG.info("Scanning class path for job JAR");
JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jarsOnClassPath.get());

LOG.info("Using {} as job jar", jobJar);
return jobJar.getEntryClass();
}

@VisibleForTesting
enum JarsOnClassPath implements Supplier<Iterable<File>> {
INSTANCE;

static final String JAVA_CLASS_PATH = "java.class.path";
static final String PATH_SEPARATOR = "path.separator";
static final String DEFAULT_PATH_SEPARATOR = ":";

@Override
public Iterable<File> get() {
String classPath = System.getProperty(JAVA_CLASS_PATH, "");
String pathSeparator = System.getProperty(PATH_SEPARATOR, DEFAULT_PATH_SEPARATOR);

return Arrays.stream(classPath.split(pathSeparator))
.filter(JarsOnClassPath::notNullAndNotEmpty)
.map(File::new)
.filter(File::isFile)
.collect(Collectors.toList());
}

private static boolean notNullAndNotEmpty(String string) {
return string != null && !string.equals("");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ static JarFileWithEntryClass findOnlyEntryClass(Iterable<File> jarFiles) throws
* <li>{@link PackagedProgram#MANIFEST_ATTRIBUTE_MAIN_CLASS}</li>
* </ol>
*
* @param jarFileZO JAR file to parse
* @param jarFile JAR file to parse
* @return Optional holding entry class
* @throws IOException If there is an error accessing the JAR
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,26 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.JarsOnClassPath;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
Expand All @@ -38,6 +50,9 @@
*/
public class ClassPathJobGraphRetrieverTest extends TestLogger {

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};

@Test
Expand All @@ -60,6 +75,40 @@ public void testJobGraphRetrieval() throws FlinkException {
assertEquals(jobGraph.getJobID(), jobId);
}

@Test
public void testJobGraphRetrievalFromJar() throws FlinkException, FileNotFoundException {
final File testJar = TestJob.getTestJobJar();
final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
new JobID(),
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS,
// No class name specified, but the test JAR "is" on the class path
null,
() -> Collections.singleton(testJar));

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration());

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
}

@Test
public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath() throws FlinkException, FileNotFoundException {
final File testJar = new File("non-existing");

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
new JobID(),
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS,
// Both a class name is specified and a JAR "is" on the class path
// The class name should have precedence.
TestJob.class.getCanonicalName(),
() -> Collections.singleton(testJar));

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(new Configuration());

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
}

@Test
public void testSavepointRestoreSettings() throws FlinkException {
final Configuration configuration = new Configuration();
Expand All @@ -77,4 +126,54 @@ public void testSavepointRestoreSettings() throws FlinkException {
assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
assertEquals(jobGraph.getJobID(), jobId);
}

@Test
public void testJarFromClassPathSupplierSanityCheck() {
Iterable<File> jarFiles = JarsOnClassPath.INSTANCE.get();

// Junit executes this test, so it should be returned as part of JARs on the class path
assertThat(jarFiles, hasItem(hasProperty("name", containsString("junit"))));
}

@Test
public void testJarFromClassPathSupplier() throws IOException {
final File file1 = temporaryFolder.newFile();
final File file2 = temporaryFolder.newFile();
final File directory = temporaryFolder.newFolder();

// Mock java.class.path property. The empty strings are important as the shell scripts
// that prepare the Flink class path often have such entries.
final String classPath = javaClassPath(
"",
"",
"",
file1.getAbsolutePath(),
"",
directory.getAbsolutePath(),
"",
file2.getAbsolutePath(),
"",
"");

Iterable<File> jarFiles = setClassPathAndGetJarsOnClassPath(classPath);

assertThat(jarFiles, contains(file1, file2));
}

private static String javaClassPath(String... entries) {
String pathSeparator = System.getProperty(JarsOnClassPath.PATH_SEPARATOR);
return String.join(pathSeparator, entries);
}

private static Iterable<File> setClassPathAndGetJarsOnClassPath(String classPath) {
final String originalClassPath = System.getProperty(JarsOnClassPath.JAVA_CLASS_PATH);
try {
System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, classPath);
return JarsOnClassPath.INSTANCE.get();
} finally {
// Reset property
System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, originalClassPath);
}
}

}

0 comments on commit 1f1cc86

Please sign in to comment.