Skip to content

Commit

Permalink
[FLINK-18362][yarn] Fix mistakenly merged commit 0e10fd5
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jul 30, 2020
1 parent 0e10fd5 commit a0227e2
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 112 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<td><h5>yarn.ship-archives</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>A semicolon-separated list of archives to be shipped to the YARN cluster. They will be un-packed when localizing.</td>
<td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
</tr>
<tr>
<td><h5>yarn.ship-directories</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
Expand Down Expand Up @@ -100,7 +101,8 @@ public void testCreateTaskExecutorCredentials() throws Exception {
new Path(root.toURI()),
0,
System.currentTimeMillis(),
LocalResourceVisibility.APPLICATION).toString());
LocalResourceVisibility.APPLICATION,
LocalResourceType.FILE).toString());
env = Collections.unmodifiableMap(env);

File credentialFile = temporaryFolder.newFile("container_tokens");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.testjob.YarnTestArchiveJob;
import org.apache.flink.yarn.testjob.YarnTestCacheJob;
import org.apache.flink.yarn.util.TestUtils;

Expand All @@ -39,7 +40,9 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
Expand All @@ -63,6 +66,9 @@ public class YARNITCase extends YarnTestBase {
private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
Expand Down Expand Up @@ -101,10 +107,16 @@ public void testPerJobWithProvidedLibDirs() throws Exception {

final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString()));

runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph(), false));
}

@Test
public void testPerJobWithArchive() throws Exception {
final Configuration flinkConfig = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
final JobGraph archiveJobGraph = YarnTestArchiveJob.getArchiveJobGraph(tmp.newFolder(), flinkConfig);
runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
}

private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolean withDist) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = withDist
? createYarnClusterDescriptor(configuration)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.testjob;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Testing job for localizing resources of LocalResourceType.ARCHIVE in per job cluster mode.
*/
public class YarnTestArchiveJob {
private static final List<String> LIST = ImmutableList.of("test1", "test2");

private static final Map<String, String> srcFiles = new HashMap<String, String>() {{
put("local1.txt", "Local text Content1");
put("local2.txt", "Local text Content2");
}};

private static void archiveFilesInDirectory(File directory, String target) throws IOException {
for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
Files.write(
Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()),
entry.getValue().getBytes());
}

try (FileOutputStream fos = new FileOutputStream(target);
GzipCompressorOutputStream gos = new GzipCompressorOutputStream(new BufferedOutputStream(fos));
TarArchiveOutputStream taros = new TarArchiveOutputStream(gos)) {

taros.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (File f : directory.listFiles()) {
taros.putArchiveEntry(new TarArchiveEntry(f, directory.getName() + File.separator + f.getName()));

try (FileInputStream fis = new FileInputStream(f); BufferedInputStream bis = new BufferedInputStream(fis)) {
IOUtils.copy(bis, taros);
taros.closeArchiveEntry();
}
}
}

for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
Files.delete(Paths.get(directory.getAbsolutePath() + File.separator + entry.getKey()));
}

}

public static JobGraph getArchiveJobGraph(File testDirectory, Configuration config) throws IOException {

final String archive = testDirectory.getAbsolutePath().concat(".tar.gz");
archiveFilesInDirectory(testDirectory, archive);
config.set(YarnConfigOptions.SHIP_ARCHIVES, Collections.singletonList(archive));

final String localizedPath = testDirectory.getName().concat(".tar.gz") + File.separator + testDirectory.getName();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.addSource(new SourceFunctionWithArchive<>(LIST, localizedPath, TypeInformation.of(String.class)))
.addSink(new DiscardingSink<>());

return env.getStreamGraph().getJobGraph();
}

private static class SourceFunctionWithArchive<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

private final List<T> inputDataset;
private final String resourcePath;
private final TypeInformation<T> returnType;

SourceFunctionWithArchive(List<T> inputDataset, String resourcePath, TypeInformation<T> returnType) {
this.inputDataset = inputDataset;
this.resourcePath = resourcePath;
this.returnType = returnType;
}

public void open(Configuration parameters) throws Exception {
for (Map.Entry<String, String> entry : srcFiles.entrySet()) {
Path path = Paths.get(resourcePath + File.separator + entry.getKey());
String content = new String(Files.readAllBytes(path));
checkArgument(entry.getValue().equals(content), "The content of the unpacked file should be identical to the original file's.");
}
}

@Override
public void run(SourceContext<T> ctx) {
for (T t : inputDataset) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(t);
}
}
}

@Override
public void cancel() {}

@Override
public TypeInformation<T> getProducedType() {
return this.returnType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,27 +140,13 @@ public void close() {
IOUtils.closeQuietly(fileSystem);
}

YarnLocalResourceDescriptor registerSingleLocalResource(
final String key,
final Path resourcePath,
final String relativeDstPath,
final boolean whetherToAddToRemotePaths,
final boolean whetherToAddToEnvShipResourceList) throws IOException {
return registerSingleLocalResource(
key,
resourcePath,
relativeDstPath,
whetherToAddToRemotePaths,
whetherToAddToEnvShipResourceList,
LocalResourceType.FILE);
}

/**
* Register a single local/remote resource and adds it to <tt>localResources</tt>.
* @param key the key to add the resource under
* @param resourcePath path of the resource to be registered
* @param relativeDstPath the relative path at the target location
* (this will be prefixed by the application-specific directory)
* @param resourceType type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
* @param whetherToAddToRemotePaths whether to add the path of local resource to <tt>remotePaths</tt>
* @param whetherToAddToEnvShipResourceList whether to add the local resource to <tt>envShipResourceList</tt>
*
Expand All @@ -170,9 +156,9 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
final String key,
final Path resourcePath,
final String relativeDstPath,
final LocalResourceType resourceType,
final boolean whetherToAddToRemotePaths,
final boolean whetherToAddToEnvShipResourceList,
final LocalResourceType resourceType) throws IOException {
final boolean whetherToAddToEnvShipResourceList) throws IOException {

addToRemotePaths(whetherToAddToRemotePaths, resourcePath);

Expand All @@ -195,8 +181,7 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
localFile.length(),
remoteFileInfo.f1,
LocalResourceVisibility.APPLICATION,
resourceType
);
resourceType);
addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
localResources.put(key, descriptor.toLocalResource());
return descriptor;
Expand Down Expand Up @@ -225,13 +210,6 @@ Tuple2<Path, Long> uploadLocalFileToRemote(
return Tuple2.of(dst, fss[0].getModificationTime());
}

List<String> registerMultipleLocalResources(
final Collection<Path> shipFiles,
final String localResourcesDirectory
) throws IOException {
return registerMultipleLocalResources(shipFiles, localResourcesDirectory, LocalResourceType.FILE);
}

/**
* Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except
* for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately. If it is
Expand All @@ -241,14 +219,15 @@ List<String> registerMultipleLocalResources(
* local or remote files to register as Yarn local resources
* @param localResourcesDirectory
* the directory the localResources are uploaded to
* @param resourceType
* type of the resource, which can be one of FILE, PATTERN, or ARCHIVE
*
* @return list of class paths with the the proper resource keys from the registration
*/
List<String> registerMultipleLocalResources(
final Collection<Path> shipFiles,
final String localResourcesDirectory,
LocalResourceType resourceType
) throws IOException {
final LocalResourceType resourceType) throws IOException {

final List<Path> localPaths = new ArrayList<>();
final List<Path> relativePaths = new ArrayList<>();
Expand Down Expand Up @@ -295,10 +274,9 @@ public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes at
key,
localPath,
relativePath.getParent().toString(),
resourceType,
true,
true,
resourceType
);
true);

if (!resourceDescriptor.alreadyRegisteredAsLocalResource()) {
if (key.endsWith("jar")) {
Expand Down Expand Up @@ -331,10 +309,9 @@ public YarnLocalResourceDescriptor uploadFlinkDist(final Path localJarPath) thro
localJarPath.getName(),
localJarPath,
"",
LocalResourceType.FILE,
true,
false,
LocalResourceType.FILE
);
false);
return flinkDist;
}

Expand All @@ -345,34 +322,26 @@ public YarnLocalResourceDescriptor uploadFlinkDist(final Path localJarPath) thro
* @return list of class paths with the file name
*/
List<String> registerProvidedLocalResources() {
return registerLocalResources(providedSharedLibs, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
}

List<String> registerLocalResources(
Map<String, FileStatus> resources,
LocalResourceVisibility resourceVisibility,
LocalResourceType resourceType) {
checkNotNull(localResources);

final ArrayList<String> classPaths = new ArrayList<>();
resources.forEach(
(fileName, fileStatus) -> {
final Path filePath = fileStatus.getPath();
LOG.debug("Using remote file {} to register local resource", filePath);

final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
.fromFileStatus(fileName, fileStatus, resourceVisibility, resourceType);
localResources.put(fileName, descriptor.toLocalResource());
remotePaths.add(filePath);
envShipResourceList.add(descriptor);

if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
classPaths.add(fileName);
} else if (isFlinkDistJar(filePath.getName())) {
flinkDist = descriptor;

}
});
providedSharedLibs.forEach(
(fileName, fileStatus) -> {
final Path filePath = fileStatus.getPath();
LOG.debug("Using remote file {} to register local resource", filePath);

final YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor
.fromFileStatus(fileName, fileStatus, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
localResources.put(fileName, descriptor.toLocalResource());
remotePaths.add(filePath);
envShipResourceList.add(descriptor);

if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
classPaths.add(fileName);
} else if (isFlinkDistJar(filePath.getName())) {
flinkDist = descriptor;
}
});
return classPaths;
}

Expand Down
Loading

0 comments on commit a0227e2

Please sign in to comment.