diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java index 9846c951113cf..13e59c53105ff 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java @@ -71,6 +71,16 @@ public void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion() t createDefaultConfiguration(getTestingJar(), YarnConfigOptions.UserJarInclusion.DISABLED))); } + @Test + public void testApplicationClusterWithRemoteUserJar() throws Exception { + final Path testingJar = getTestingJar(); + final Path remoteJar = new Path(miniDFSCluster.getFileSystem().getHomeDirectory(), testingJar.getName()); + miniDFSCluster.getFileSystem().copyFromLocalFile(testingJar, remoteJar); + runTest( + () -> deployApplication( + createDefaultConfiguration(remoteJar, YarnConfigOptions.UserJarInclusion.DISABLED))); + } + private void deployApplication(Configuration configuration) throws Exception { try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 564fc02fb7236..54dd06309415e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -492,6 +492,11 @@ static ContainerLaunchContext createTaskExecutorContext( return ctx; } + static boolean isRemotePath(String path) throws IOException { + org.apache.flink.core.fs.Path flinkPath = new org.apache.flink.core.fs.Path(path); + return flinkPath.getFileSystem().isDistributedFS(); + } + private static List decodeYarnLocalResourceDescriptorListFromString(String resources) throws Exception { final List resourceDescriptors = new ArrayList<>(); for (String shipResourceDescStr : resources.split(LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR)) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java index 3381561cf5374..1f4e39c6a93ae 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java @@ -38,6 +38,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.SimpleFileVisitor; @@ -108,9 +109,9 @@ public void close() { } /** - * Uploads and registers a single resource and adds it to localResources. + * Register a single local/remote resource and adds it to localResources. * @param key the key to add the resource under - * @param localSrcPath local path to the file + * @param resourcePath path of the resource to be registered * @param relativeDstPath the relative path at the target location * (this will be prefixed by the application-specific directory) * @param replicationFactor number of replications of a remote file to be created @@ -118,24 +119,31 @@ public void close() { */ YarnLocalResourceDescriptor registerSingleLocalResource( final String key, - final Path localSrcPath, + final Path resourcePath, final String relativeDstPath, final int replicationFactor) throws IOException { - final FileStatus fileStatus = providedSharedLibs.get(localSrcPath.getName()); + if (Utils.isRemotePath(resourcePath.toString())) { + final FileStatus fileStatus = fileSystem.getFileStatus(resourcePath); + LOG.debug("Using remote file {} to register local resource", fileStatus.getPath()); + + final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor + .fromFileStatus(key, fileStatus, LocalResourceVisibility.APPLICATION); + localResources.put(key, descriptor.toLocalResource()); + return descriptor; + } + + final FileStatus fileStatus = providedSharedLibs.get(resourcePath.getName()); if (fileStatus != null) { - LOG.debug("Using provided file {} instead of the local {}", fileStatus.getPath(), localSrcPath); - - return new YarnLocalResourceDescriptor( - fileStatus.getPath().getName(), - fileStatus.getPath(), - fileStatus.getLen(), - fileStatus.getModificationTime(), - LocalResourceVisibility.PUBLIC); + LOG.debug("Using provided file {} instead of the local {}", fileStatus.getPath(), resourcePath); + + final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor + .fromFileStatus(fileStatus.getPath().getName(), fileStatus, LocalResourceVisibility.PUBLIC); + return descriptor; } - final File localFile = new File(localSrcPath.toUri().getPath()); - final Tuple2 remoteFileInfo = uploadLocalFileToRemote(localSrcPath, relativeDstPath, replicationFactor); + final File localFile = new File(resourcePath.toUri().getPath()); + final Tuple2 remoteFileInfo = uploadLocalFileToRemote(resourcePath, relativeDstPath, replicationFactor); final YarnLocalResourceDescriptor descriptor = new YarnLocalResourceDescriptor( key, remoteFileInfo.f0, @@ -172,10 +180,11 @@ Tuple2 uploadLocalFileToRemote( /** * Recursively uploads (and registers) any (user and system) files in shipFiles except - * for files matching "flink-dist*.jar" which should be uploaded separately. + * for files matching "flink-dist*.jar" which should be uploaded separately. If it is + * already a remote file, the uploading will be skipped. * * @param shipFiles - * files to upload + * local or remote files to register as Yarn local resources * @param remotePaths * paths of the remote resources (uploaded resources will be added) * @param localResourcesDirectory @@ -188,7 +197,7 @@ Tuple2 uploadLocalFileToRemote( * @return list of class paths with the the proper resource keys from the registration */ List registerMultipleLocalResources( - final Collection shipFiles, + final Collection shipFiles, final List remotePaths, final String localResourcesDirectory, final List envShipResourceList, @@ -197,23 +206,36 @@ List registerMultipleLocalResources( checkArgument(replicationFactor >= 1); final List localPaths = new ArrayList<>(); final List relativePaths = new ArrayList<>(); - for (File shipFile : shipFiles) { - if (shipFile.isDirectory()) { - // add directories to the classpath - final java.nio.file.Path shipPath = shipFile.toPath(); - final java.nio.file.Path parentPath = shipPath.getParent(); - Files.walkFileTree(shipPath, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) { - localPaths.add(new Path(file.toUri())); - relativePaths.add(new Path(localResourcesDirectory, parentPath.relativize(file).toString())); - return FileVisitResult.CONTINUE; + for (Path shipFile : shipFiles) { + if (Utils.isRemotePath(shipFile.toString())) { + if (fileSystem.isDirectory(shipFile)) { + final URI parentURI = shipFile.getParent().toUri(); + final RemoteIterator iterable = fileSystem.listFiles(shipFile, true); + while (iterable.hasNext()) { + final Path current = iterable.next().getPath(); + localPaths.add(current); + relativePaths.add(new Path(localResourcesDirectory, parentURI.relativize(current.toUri()).getPath())); } - }); + continue; + } } else { - localPaths.add(new Path(shipFile.toURI())); - relativePaths.add(new Path(localResourcesDirectory, shipFile.getName())); + final File file = new File(shipFile.toUri().getPath()); + if (file.isDirectory()) { + final java.nio.file.Path shipPath = file.toPath(); + final java.nio.file.Path parentPath = shipPath.getParent(); + Files.walkFileTree(shipPath, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) { + localPaths.add(new Path(file.toUri())); + relativePaths.add(new Path(localResourcesDirectory, parentPath.relativize(file).toString())); + return FileVisitResult.CONTINUE; + } + }); + continue; + } } + localPaths.add(shipFile); + relativePaths.add(new Path(localResourcesDirectory, shipFile.getName())); } final Set archives = new HashSet<>(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index be809060f1e76..fc96b7a42d837 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -736,14 +736,14 @@ private ApplicationReport startAppMaster( 1)); } - final Set userJarFiles = new HashSet<>(); + final Set userJarFiles = new HashSet<>(); if (jobGraph != null) { - userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet())); + userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet())); } final List jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create); if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) { - userJarFiles.addAll(jarUrls.stream().map(File::new).collect(Collectors.toSet())); + userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet())); } int yarnFileReplication = yarnConfiguration.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); @@ -753,10 +753,9 @@ private ApplicationReport startAppMaster( // only for per job mode if (jobGraph != null) { for (Map.Entry entry : jobGraph.getUserArtifacts().entrySet()) { - org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath); // only upload local files - if (!path.getFileSystem().isDistributedFS()) { - Path localPath = new Path(path.getPath()); + if (Utils.isRemotePath(entry.getValue().filePath)) { + Path localPath = new Path(entry.getValue().filePath); Tuple2 remoteFileInfo = fileUploader.uploadLocalFileToRemote(localPath, entry.getKey(), fileReplication); jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString()); @@ -775,7 +774,7 @@ private ApplicationReport startAppMaster( // and upload the remaining dependencies as local resources with APPLICATION visibility. final List systemClassPaths = fileUploader.registerProvidedLocalResources(); final List uploadedDependencies = fileUploader.registerMultipleLocalResources( - systemShipFiles, + systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), paths, Path.CUR_DIR, envShipResourceList, @@ -784,12 +783,13 @@ private ApplicationReport startAppMaster( // upload and register ship-only files fileUploader.registerMultipleLocalResources( - shipOnlyFiles, + shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), paths, Path.CUR_DIR, envShipResourceList, fileReplication); + // Upload and register user jars final List userClassPaths = fileUploader.registerMultipleLocalResources( userJarFiles, paths, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java index fd00227b82ba4..a1f2fe6e66ebf 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnLocalResourceDescriptor.java @@ -20,6 +20,7 @@ import org.apache.flink.util.FlinkException; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -99,6 +100,21 @@ static YarnLocalResourceDescriptor fromString(String desc) throws Exception { } } + static YarnLocalResourceDescriptor fromFileStatus( + final String key, + final FileStatus fileStatus, + final LocalResourceVisibility visibility) { + checkNotNull(key); + checkNotNull(fileStatus); + checkNotNull(visibility); + return new YarnLocalResourceDescriptor( + key, + fileStatus.getPath(), + fileStatus.getLen(), + fileStatus.getModificationTime(), + visibility); + } + @Override public String toString() { return String.format(STRING_FORMAT, resourceKey, path.toString(), size, modificationTime, visibility); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java index 738140713edae..0a1cbae2c73dc 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -118,7 +118,7 @@ public void testCopyFromLocalRecursiveWithScheme() throws Exception { final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); final Path targetDir = targetFileSystem.getWorkingDirectory(); - testCopyFromLocalRecursive(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true); + testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true, false); } /** @@ -130,7 +130,7 @@ public void testCopyFromLocalRecursiveWithoutScheme() throws Exception { final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); final Path targetDir = targetFileSystem.getWorkingDirectory(); - testCopyFromLocalRecursive(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, false); + testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, false, false); } /** @@ -144,8 +144,16 @@ public void testCopySingleFileFromLocal() throws IOException, URISyntaxException testCopySingleFileFromLocal(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder); } + @Test + public void testRegisterMultipleLocalResourcesWithRemoteFiles() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testRegisterMultipleLocalResources(targetFileSystem, targetDir, LOCAL_RESOURCE_DIRECTORY, tempFolder, true, true); + } + /** - * Verifies that nested directories are properly copied with the given filesystem and paths. + * Verifies that nested directories are properly copied and registered with the given filesystem and paths. * * @param targetFileSystem * file system of the target path @@ -157,24 +165,21 @@ public void testCopySingleFileFromLocal() throws IOException, URISyntaxException * JUnit temporary folder rule to create the source directory with * @param addSchemeToLocalPath * whether add the file:// scheme to the local path to copy from + * @param useRemoteFiles + * whether register the local resource with remote files */ - static void testCopyFromLocalRecursive( + static void testRegisterMultipleLocalResources( FileSystem targetFileSystem, Path targetDir, String localResourceDirectory, TemporaryFolder tempFolder, - boolean addSchemeToLocalPath) throws Exception { + boolean addSchemeToLocalPath, + boolean useRemoteFiles) throws Exception { // directory must not yet exist assertFalse(targetFileSystem.exists(targetDir)); final File srcDir = tempFolder.newFolder(); - final Path srcPath; - if (addSchemeToLocalPath) { - srcPath = new Path("file://" + srcDir.getAbsolutePath()); - } else { - srcPath = new Path(srcDir.getAbsolutePath()); - } final HashMap srcFiles = new HashMap<>(4); @@ -187,6 +192,18 @@ static void testCopyFromLocalRecursive( generateFilesInDirectory(srcDir, srcFiles); + final Path srcPath; + if (useRemoteFiles) { + srcPath = new Path(hdfsRootPath.toString() + "/tmp/remoteFiles"); + hdfsCluster.getFileSystem().copyFromLocalFile(new Path(srcDir.getAbsolutePath()), srcPath); + } else { + if (addSchemeToLocalPath) { + srcPath = new Path("file://" + srcDir.getAbsolutePath()); + } else { + srcPath = new Path(srcDir.getAbsolutePath()); + } + } + // copy the created directory recursively: try { final List remotePaths = new ArrayList<>(); @@ -196,13 +213,13 @@ static void testCopyFromLocalRecursive( targetFileSystem, targetDir, Collections.emptyList(), applicationId); final List classpath = uploader.registerMultipleLocalResources( - Collections.singletonList(new File(srcPath.toUri().getPath())), + Collections.singletonList(srcPath), remotePaths, localResourceDirectory, new ArrayList<>(), DFSConfigKeys.DFS_REPLICATION_DEFAULT); - final Path basePath = new Path(localResourceDirectory, srcDir.getName()); + final Path basePath = new Path(localResourceDirectory, srcPath.getName()); final Path nestedPath = new Path(basePath, "nested"); assertThat( classpath, @@ -260,7 +277,7 @@ private static void testCopySingleFileFromLocal( targetFileSystem, targetDir, Collections.emptyList(), applicationId); final List classpath = uploader.registerMultipleLocalResources( - Collections.singletonList(new File(srcDir, localFile)), + Collections.singletonList(new Path(srcDir.getAbsolutePath(), localFile)), remotePaths, localResourceDirectory, new ArrayList<>(), diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java index bfde0c98c2031..56d1e328b54e6 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java @@ -165,8 +165,8 @@ private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws try { final Path directory = new Path(basePath, pathSuffix); - YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(), - new org.apache.hadoop.fs.Path(directory.toUri()), Path.CUR_DIR, tempFolder, false); + YarnFileStageTest.testRegisterMultipleLocalResources(fs.getHadoopFileSystem(), + new org.apache.hadoop.fs.Path(directory.toUri()), Path.CUR_DIR, tempFolder, false, false); } finally { // clean up fs.delete(basePath, true);