Skip to content

Commit

Permalink
[FLINK-7989][yarn] Do not upload the flink-dist jar twice
Browse files Browse the repository at this point in the history
We always add the dist.jar ourselves, but it could also be inside a shipped
folder such as the "lib/" folder and was then distributed multiple times.

This closes apache#4951.
  • Loading branch information
Nico Kruber authored and zentol committed Nov 29, 2017
1 parent 4daf922 commit 520a74f
Showing 1 changed file with 69 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,17 @@ public void setLocalJarPath(Path localJarPath) {
this.flinkJarPath = localJarPath;
}

/**
* Adds the given files to the list of files to ship.
*
* <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
* {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)}
* since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
*
* @param shipFiles files to ship
*/
public void addShipFiles(List<File> shipFiles) {
for (File shipFile: shipFiles) {
// remove uberjar from ship list (by default everything in the lib/ folder is added to
// the list of files to ship, but we handle the uberjar separately.
if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
this.shipFiles.add(shipFile);
}
}
this.shipFiles.addAll(shipFiles);
}

public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
Expand Down Expand Up @@ -1048,6 +1051,27 @@ private static Path setupSingleLocalResource(
return resource.f0;
}

/**
* Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately.
*
* @param shipFiles
* files to upload
* @param fs
* file system to upload to
* @param targetHomeDir
* remote home directory to upload to
* @param appId
* application ID
* @param remotePaths
* paths of the remote resources (uploaded resources will be added)
* @param localResources
* map of resources (uploaded resources will be added)
* @param envShipFileList
* list of shipped files in a format understood by {@link Utils#createTaskExecutorContext}
*
* @return list of class paths with the the proper resource keys from the registration
*/
static List<String> uploadAndRegisterFiles(
Collection<File> shipFiles,
FileSystem fs,
Expand All @@ -1068,40 +1092,48 @@ static List<String> uploadAndRegisterFiles(
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
java.nio.file.Path relativePath = parentPath.relativize(file);

String key = relativePath.toString();
try {
Path remotePath = setupSingleLocalResource(
key,
fs,
appId,
new Path(file.toUri()),
localResources,
targetHomeDir,
relativePath.getParent().toString());
remotePaths.add(remotePath);
envShipFileList.append(key).append("=").append(remotePath).append(",");

// add files to the classpath
classPaths.add(key);

return FileVisitResult.CONTINUE;
} catch (URISyntaxException e) {
throw new IOException(e);

if (!(file.getFileName().startsWith("flink-dist") &&
file.getFileName().endsWith("jar"))) {

java.nio.file.Path relativePath = parentPath.relativize(file);

String key = relativePath.toString();
try {
Path remotePath = setupSingleLocalResource(
key,
fs,
appId,
new Path(file.toUri()),
localResources,
targetHomeDir,
relativePath.getParent().toString());
remotePaths.add(remotePath);
envShipFileList.append(key).append("=")
.append(remotePath).append(",");

// add files to the classpath
classPaths.add(key);
} catch (URISyntaxException e) {
throw new IOException(e);
}
}

return FileVisitResult.CONTINUE;
}
});
} else {
Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
String key = shipFile.getName();
Path remotePath = setupSingleLocalResource(
key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
remotePaths.add(remotePath);
envShipFileList.append(key).append("=").append(remotePath).append(",");

// add files to the classpath
classPaths.add(key);
if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
Path shipLocalPath = new Path(shipFile.toURI());
String key = shipFile.getName();
Path remotePath = setupSingleLocalResource(
key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
remotePaths.add(remotePath);
envShipFileList.append(key).append("=").append(remotePath).append(",");

// add files to the classpath
classPaths.add(key);
}
}

}
Expand Down

0 comments on commit 520a74f

Please sign in to comment.