Skip to content

Commit

Permalink
[FLINK-17632][yarn] Support to specify a remote path for job jar
Browse files Browse the repository at this point in the history
This closes apache#12143.
  • Loading branch information
wangyang0918 authored and kl0u committed May 15, 2020
1 parent 04bcc25 commit 681b211
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion() t
createDefaultConfiguration(getTestingJar(), YarnConfigOptions.UserJarInclusion.DISABLED)));
}

@Test
public void testApplicationClusterWithRemoteUserJar() throws Exception {
final Path testingJar = getTestingJar();
final Path remoteJar = new Path(miniDFSCluster.getFileSystem().getHomeDirectory(), testingJar.getName());
miniDFSCluster.getFileSystem().copyFromLocalFile(testingJar, remoteJar);
runTest(
() -> deployApplication(
createDefaultConfiguration(remoteJar, YarnConfigOptions.UserJarInclusion.DISABLED)));
}

private void deployApplication(Configuration configuration) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

Expand Down
5 changes: 5 additions & 0 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ static ContainerLaunchContext createTaskExecutorContext(
return ctx;
}

static boolean isRemotePath(String path) throws IOException {
org.apache.flink.core.fs.Path flinkPath = new org.apache.flink.core.fs.Path(path);
return flinkPath.getFileSystem().isDistributedFS();
}

private static List<YarnLocalResourceDescriptor> decodeYarnLocalResourceDescriptorListFromString(String resources) throws Exception {
final List<YarnLocalResourceDescriptor> resourceDescriptors = new ArrayList<>();
for (String shipResourceDescStr : resources.split(LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
Expand Down Expand Up @@ -108,34 +109,41 @@ public void close() {
}

/**
* Uploads and registers a single resource and adds it to <tt>localResources</tt>.
* Register a single local/remote resource and adds it to <tt>localResources</tt>.
* @param key the key to add the resource under
* @param localSrcPath local path to the file
* @param resourcePath path of the resource to be registered
* @param relativeDstPath the relative path at the target location
* (this will be prefixed by the application-specific directory)
* @param replicationFactor number of replications of a remote file to be created
* @return the uploaded resource descriptor
*/
YarnLocalResourceDescriptor registerSingleLocalResource(
final String key,
final Path localSrcPath,
final Path resourcePath,
final String relativeDstPath,
final int replicationFactor) throws IOException {

final FileStatus fileStatus = providedSharedLibs.get(localSrcPath.getName());
if (Utils.isRemotePath(resourcePath.toString())) {
final FileStatus fileStatus = fileSystem.getFileStatus(resourcePath);
LOG.debug("Using remote file {} to register local resource", fileStatus.getPath());

final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
.fromFileStatus(key, fileStatus, LocalResourceVisibility.APPLICATION);
localResources.put(key, descriptor.toLocalResource());
return descriptor;
}

final FileStatus fileStatus = providedSharedLibs.get(resourcePath.getName());
if (fileStatus != null) {
LOG.debug("Using provided file {} instead of the local {}", fileStatus.getPath(), localSrcPath);

return new YarnLocalResourceDescriptor(
fileStatus.getPath().getName(),
fileStatus.getPath(),
fileStatus.getLen(),
fileStatus.getModificationTime(),
LocalResourceVisibility.PUBLIC);
LOG.debug("Using provided file {} instead of the local {}", fileStatus.getPath(), resourcePath);

final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
.fromFileStatus(fileStatus.getPath().getName(), fileStatus, LocalResourceVisibility.PUBLIC);
return descriptor;
}

final File localFile = new File(localSrcPath.toUri().getPath());
final Tuple2<Path, Long> remoteFileInfo = uploadLocalFileToRemote(localSrcPath, relativeDstPath, replicationFactor);
final File localFile = new File(resourcePath.toUri().getPath());
final Tuple2<Path, Long> remoteFileInfo = uploadLocalFileToRemote(resourcePath, relativeDstPath, replicationFactor);
final YarnLocalResourceDescriptor descriptor = new YarnLocalResourceDescriptor(
key,
remoteFileInfo.f0,
Expand Down Expand Up @@ -172,10 +180,11 @@ Tuple2<Path, Long> uploadLocalFileToRemote(

/**
* Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately. If it is
* already a remote file, the uploading will be skipped.
*
* @param shipFiles
* files to upload
* local or remote files to register as Yarn local resources
* @param remotePaths
* paths of the remote resources (uploaded resources will be added)
* @param localResourcesDirectory
Expand All @@ -188,7 +197,7 @@ Tuple2<Path, Long> uploadLocalFileToRemote(
* @return list of class paths with the the proper resource keys from the registration
*/
List<String> registerMultipleLocalResources(
final Collection<File> shipFiles,
final Collection<Path> shipFiles,
final List<Path> remotePaths,
final String localResourcesDirectory,
final List<YarnLocalResourceDescriptor> envShipResourceList,
Expand All @@ -197,23 +206,36 @@ List<String> registerMultipleLocalResources(
checkArgument(replicationFactor >= 1);
final List<Path> localPaths = new ArrayList<>();
final List<Path> relativePaths = new ArrayList<>();
for (File shipFile : shipFiles) {
if (shipFile.isDirectory()) {
// add directories to the classpath
final java.nio.file.Path shipPath = shipFile.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();
Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
localPaths.add(new Path(file.toUri()));
relativePaths.add(new Path(localResourcesDirectory, parentPath.relativize(file).toString()));
return FileVisitResult.CONTINUE;
for (Path shipFile : shipFiles) {
if (Utils.isRemotePath(shipFile.toString())) {
if (fileSystem.isDirectory(shipFile)) {
final URI parentURI = shipFile.getParent().toUri();
final RemoteIterator<LocatedFileStatus> iterable = fileSystem.listFiles(shipFile, true);
while (iterable.hasNext()) {
final Path current = iterable.next().getPath();
localPaths.add(current);
relativePaths.add(new Path(localResourcesDirectory, parentURI.relativize(current.toUri()).getPath()));
}
});
continue;
}
} else {
localPaths.add(new Path(shipFile.toURI()));
relativePaths.add(new Path(localResourcesDirectory, shipFile.getName()));
final File file = new File(shipFile.toUri().getPath());
if (file.isDirectory()) {
final java.nio.file.Path shipPath = file.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();
Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
localPaths.add(new Path(file.toUri()));
relativePaths.add(new Path(localResourcesDirectory, parentPath.relativize(file).toString()));
return FileVisitResult.CONTINUE;
}
});
continue;
}
}
localPaths.add(shipFile);
relativePaths.add(new Path(localResourcesDirectory, shipFile.getName()));
}

final Set<String> archives = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,14 @@ private ApplicationReport startAppMaster(
1));
}

final Set<File> userJarFiles = new HashSet<>();
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet()));
userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));
}

final List<URI> jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(File::new).collect(Collectors.toSet()));
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}

int yarnFileReplication = yarnConfiguration.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);
Expand All @@ -753,10 +753,9 @@ private ApplicationReport startAppMaster(
// only for per job mode
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
// only upload local files
if (!path.getFileSystem().isDistributedFS()) {
Path localPath = new Path(path.getPath());
if (Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey(), fileReplication);
jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
Expand All @@ -775,7 +774,7 @@ private ApplicationReport startAppMaster(
// and upload the remaining dependencies as local resources with APPLICATION visibility.
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources(
systemShipFiles,
systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
paths,
Path.CUR_DIR,
envShipResourceList,
Expand All @@ -784,12 +783,13 @@ private ApplicationReport startAppMaster(

// upload and register ship-only files
fileUploader.registerMultipleLocalResources(
shipOnlyFiles,
shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
paths,
Path.CUR_DIR,
envShipResourceList,
fileReplication);

// Upload and register user jars
final List<String> userClassPaths = fileUploader.registerMultipleLocalResources(
userJarFiles,
paths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.util.FlinkException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
Expand Down Expand Up @@ -99,6 +100,21 @@ static YarnLocalResourceDescriptor fromString(String desc) throws Exception {
}
}

static YarnLocalResourceDescriptor fromFileStatus(
final String key,
final FileStatus fileStatus,
final LocalResourceVisibility visibility) {
checkNotNull(key);
checkNotNull(fileStatus);
checkNotNull(visibility);
return new YarnLocalResourceDescriptor(
key,
fileStatus.getPath(),
fileStatus.getLen(),
fileStatus.getModificationTime(),
visibility);
}

@Override
public String toString() {
return String.format(STRING_FORMAT, resourceKey, path.toString(), size, modificationTime, visibility);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testCopyFromLocalRecursiveWithScheme() throws Exception {
final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
final Path targetDir = targetFileSystem.getWorkingDirectory();

testCopyFromLocalRecursive(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true);
testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true, false);
}

/**
Expand All @@ -130,7 +130,7 @@ public void testCopyFromLocalRecursiveWithoutScheme() throws Exception {
final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
final Path targetDir = targetFileSystem.getWorkingDirectory();

testCopyFromLocalRecursive(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, false);
testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, false, false);
}

/**
Expand All @@ -144,8 +144,16 @@ public void testCopySingleFileFromLocal() throws IOException, URISyntaxException
testCopySingleFileFromLocal(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder);
}

@Test
public void testRegisterMultipleLocalResourcesWithRemoteFiles() throws Exception {
final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
final Path targetDir = targetFileSystem.getWorkingDirectory();

testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true, true);
}

/**
* Verifies that nested directories are properly copied with the given filesystem and paths.
* Verifies that nested directories are properly copied and registered with the given filesystem and paths.
*
* @param targetFileSystem
* file system of the target path
Expand All @@ -157,24 +165,21 @@ public void testCopySingleFileFromLocal() throws IOException, URISyntaxException
* JUnit temporary folder rule to create the source directory with
* @param addSchemeToLocalPath
* whether add the <tt>file://</tt> scheme to the local path to copy from
* @param useRemoteFiles
* whether register the local resource with remote files
*/
static void testCopyFromLocalRecursive(
static void testRegisterMultipleLocalResources(
FileSystem targetFileSystem,
Path targetDir,
String localResourceDirectory,
TemporaryFolder tempFolder,
boolean addSchemeToLocalPath) throws Exception {
boolean addSchemeToLocalPath,
boolean useRemoteFiles) throws Exception {

// directory must not yet exist
assertFalse(targetFileSystem.exists(targetDir));

final File srcDir = tempFolder.newFolder();
final Path srcPath;
if (addSchemeToLocalPath) {
srcPath = new Path("file://" + srcDir.getAbsolutePath());
} else {
srcPath = new Path(srcDir.getAbsolutePath());
}

final HashMap<String /* (relative) path */, /* contents */ String> srcFiles = new HashMap<>(4);

Expand All @@ -187,6 +192,18 @@ static void testCopyFromLocalRecursive(

generateFilesInDirectory(srcDir, srcFiles);

final Path srcPath;
if (useRemoteFiles) {
srcPath = new Path(hdfsRootPath.toString() + "/tmp/remoteFiles");
hdfsCluster.getFileSystem().copyFromLocalFile(new Path(srcDir.getAbsolutePath()), srcPath);
} else {
if (addSchemeToLocalPath) {
srcPath = new Path("file://" + srcDir.getAbsolutePath());
} else {
srcPath = new Path(srcDir.getAbsolutePath());
}
}

// copy the created directory recursively:
try {
final List<Path> remotePaths = new ArrayList<>();
Expand All @@ -196,13 +213,13 @@ static void testCopyFromLocalRecursive(
targetFileSystem, targetDir, Collections.emptyList(), applicationId);

final List<String> classpath = uploader.registerMultipleLocalResources(
Collections.singletonList(new File(srcPath.toUri().getPath())),
Collections.singletonList(srcPath),
remotePaths,
localResourceDirectory,
new ArrayList<>(),
DFSConfigKeys.DFS_REPLICATION_DEFAULT);

final Path basePath = new Path(localResourceDirectory, srcDir.getName());
final Path basePath = new Path(localResourceDirectory, srcPath.getName());
final Path nestedPath = new Path(basePath, "nested");
assertThat(
classpath,
Expand Down Expand Up @@ -260,7 +277,7 @@ private static void testCopySingleFileFromLocal(
targetFileSystem, targetDir, Collections.emptyList(), applicationId);

final List<String> classpath = uploader.registerMultipleLocalResources(
Collections.singletonList(new File(srcDir, localFile)),
Collections.singletonList(new Path(srcDir.getAbsolutePath(), localFile)),
remotePaths,
localResourceDirectory,
new ArrayList<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws
try {
final Path directory = new Path(basePath, pathSuffix);

YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
new org.apache.hadoop.fs.Path(directory.toUri()), Path.CUR_DIR, tempFolder, false);
YarnFileStageTest.testRegisterMultipleLocalResources(fs.getHadoopFileSystem(),
new org.apache.hadoop.fs.Path(directory.toUri()), Path.CUR_DIR, tempFolder, false, false);
} finally {
// clean up
fs.delete(basePath, true);
Expand Down

0 comments on commit 681b211

Please sign in to comment.