Skip to content

Commit

Permalink
[FLINK-14466][runtime] Let YarnJobClusterEntrypoint use user code cla…
Browse files Browse the repository at this point in the history
…ss loader

FileJobGraphRetriever adds the user class path to JobGraph if the user classpath
is not empty.

The system class path does not include the user jars in yarn perjob mode when
user set the yarn.per-job-cluster.include-user-jar to "DISABLED". The user jars
are loaded by FlinkUserCodeClassLoaders.

Check if there is a ship directory, whose name is the same as
DEFAULT_FLINK_USR_LIB_DIR. Throw a IllegalArgumentException if there is one when
userJarInclusion == DISABLED.

This closes apache#10152.
  • Loading branch information
guoweiM authored and GJL committed Nov 19, 2019
1 parent 974f4da commit 0a9db1e
Show file tree
Hide file tree
Showing 13 changed files with 481 additions and 152 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
<td><h5>yarn.per-job-cluster.include-user-jar</h5></td>
<td style="word-wrap: break-word;">"ORDER"</td>
<td>String</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). "DISABLED" means the user-jars are excluded from the system class path.</td>
</tr>
<tr>
<td><h5>yarn.properties-file.location</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -94,12 +95,12 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
schedulerConfiguration),
FileJobGraphRetriever.createFrom(configuration));
FileJobGraphRetriever.createFrom(configuration, null));
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.FunctionUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
Expand All @@ -37,6 +38,7 @@
public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever {

/** User classpaths in relative form to the working directory. */
@Nonnull
private final Collection<URL> userClassPaths;

protected AbstractUserClassPathJobGraphRetriever(@Nullable File jobDir) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,24 @@
import org.apache.flink.util.FlinkException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@link JobGraphRetriever} implementation which retrieves the {@link JobGraph} from
* a file on disk.
*/
public class FileJobGraphRetriever implements JobGraphRetriever {
public class FileJobGraphRetriever extends AbstractUserClassPathJobGraphRetriever {

public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = ConfigOptions
.key("internal.jobgraph-path")
Expand All @@ -45,26 +51,39 @@ public class FileJobGraphRetriever implements JobGraphRetriever {
@Nonnull
private final String jobGraphFile;

public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
public FileJobGraphRetriever(@Nonnull String jobGraphFile, @Nullable File usrLibDir) throws IOException {
super(usrLibDir);
this.jobGraphFile = jobGraphFile;
}

@Override
public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
File fp = new File(jobGraphFile);
final File fp = new File(jobGraphFile);

try (FileInputStream input = new FileInputStream(fp);
ObjectInputStream obInput = new ObjectInputStream(input)) {

return (JobGraph) obInput.readObject();
final JobGraph jobGraph = (JobGraph) obInput.readObject();
addUserClassPathsToJobGraph(jobGraph);
return jobGraph;
} catch (FileNotFoundException e) {
throw new FlinkException("Could not find the JobGraph file.", e);
} catch (ClassNotFoundException | IOException e) {
throw new FlinkException("Could not load the JobGraph from file.", e);
}
}

public static FileJobGraphRetriever createFrom(Configuration configuration) {
return new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
private void addUserClassPathsToJobGraph(JobGraph jobGraph) {
final List<URL> classPaths = new ArrayList<>();

if (jobGraph.getClasspaths() != null) {
classPaths.addAll(jobGraph.getClasspaths());
}
classPaths.addAll(getUserClassPaths());
jobGraph.setClasspaths(classPaths);
}

public static FileJobGraphRetriever createFrom(Configuration configuration, @Nullable File usrLibDir) throws IOException {
checkNotNull(configuration, "configuration");
return new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH), usrLibDir);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.entrypoint.component;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.nio.file.StandardOpenOption.CREATE;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link FileJobGraphRetriever}.
*/
public class FileJobGraphRetrieverTest {

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static Configuration configuration;

private static Path jarFileInJobGraph;

@BeforeClass
public static void init() throws IOException {
final Path jobGraphPath = TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath();

jarFileInJobGraph = TEMPORARY_FOLDER.newFile("jar_file_in_job_graph.jar").toPath();

final JobVertex source = new JobVertex("source");
final JobVertex target = new JobVertex("target");
final JobGraph jobGraph = new JobGraph(new JobID(), "test", source, target);

jobGraph.setClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL()));

try (ObjectOutputStream objectOutputStream =
new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) {
objectOutputStream.writeObject(jobGraph);
}
final Configuration cfg = new Configuration();
cfg.setString(JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
configuration = new UnmodifiableConfiguration(cfg);
}

@Test
public void testRetrieveJobGraphWithJarInUsrLibDir() throws IOException, FlinkException {
final File usrLibDir = temporaryFolder.newFolder("job");
final File jarFileInUsrLibDir = Files.createFile(usrLibDir.toPath().resolve("jar_file_in_job_dir.jar")).toFile();
final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
final Path relativeJarFileInUsrLibDirPath = FileUtils.relativizePath(workingDirectory, jarFileInUsrLibDir.toPath());
final List<URL> expectedURLs = new ArrayList<>();

expectedURLs.add(jarFileInJobGraph.toUri().toURL());
expectedURLs.add(FileUtils.toURL(relativeJarFileInUsrLibDirPath));

final FileJobGraphRetriever fileJobGraphRetriever = FileJobGraphRetriever.createFrom(configuration, usrLibDir);
final JobGraph jobGraphFromFile = fileJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraphFromFile.getClasspaths(), containsInAnyOrder(expectedURLs.toArray()));
}

@Test
public void testRetrieveJobGraphWithoutUsrLibDir() throws IOException, FlinkException {
final FileJobGraphRetriever fileJobGraphRetriever = FileJobGraphRetriever.createFrom(configuration, null);
final List<URL> expectedUrls = Collections.singletonList(jarFileInJobGraph.toUri().toURL());
final JobGraph jobGraph = fileJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getClasspaths(), containsInAnyOrder(expectedUrls.toArray()));
}

}
Loading

0 comments on commit 0a9db1e

Please sign in to comment.