Skip to content

Commit

Permalink
[FLINK-12343][yarn] Add YARN file replication option
Browse files Browse the repository at this point in the history
This closes apache#10980 .
  • Loading branch information
HuangZhenQiu authored and tisonkun committed Feb 20, 2020
1 parent fae420e commit 9d91aac
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Integer</td>
<td>The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the <span markdown="span">`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`</span>.</td>
</tr>
<tr>
<td><h5>yarn.file-replication</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Number of file replication of each local resource file. If it is not configured, Flink will use the default replication value in hadoop configuration.</td>
</tr>
<tr>
<td><h5>yarn.flink-dist-jar</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Test cases for the deployment of Yarn Flink clusters with customized file replication numbers.
*/
public class YARNFileReplicationITCase extends YARNITCase {

@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
startYARNWithConfig(YARN_CONFIGURATION, true);
}

@Test
public void testPerJobModeWithCustomizedFileReplication() throws Exception {
Configuration configuration = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
configuration.setInteger(YarnConfigOptions.FILE_REPLICATION, 4);

runTest(() -> deployPerjob(
configuration,
getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDefaultFileReplication() throws Exception {
Configuration configuration = createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);

runTest(() -> deployPerjob(
configuration,
getTestingJobGraph()));
}

@Override
protected void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
final FileSystem fs = FileSystem.get(getYarnConfiguration());

String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();

Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);

final int flinkFileReplication = configuration.getInteger(YarnConfigOptions.FILE_REPLICATION);
final int replication = YARN_CONFIGURATION.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);

// If YarnConfigOptions.FILE_REPLICATION is not set. The replication number should equals to yarn configuration value.
int expectedReplication = flinkFileReplication > 0
? flinkFileReplication : replication;
assertEquals(expectedReplication, fsStatus.getReplication());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,26 @@ public static void setup() {

@Test
public void testPerJobModeWithEnableSystemClassPathIncludeUserJar() throws Exception {
runTest(() -> deployPerjob(YarnConfigOptions.UserJarInclusion.FIRST, getTestingJobGraph()));
runTest(() -> deployPerjob(
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.FIRST),
getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDisableSystemClassPathIncludeUserJar() throws Exception {
runTest(() -> deployPerjob(YarnConfigOptions.UserJarInclusion.DISABLED, getTestingJobGraph()));
runTest(() -> deployPerjob(
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED),
getTestingJobGraph()));
}

@Test
public void testPerJobModeWithDistributedCache() throws Exception {
runTest(() -> deployPerjob(
YarnConfigOptions.UserJarInclusion.DISABLED,
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED),
YarnTestCacheJob.getDistributedCacheJobGraph(tmp.newFolder())));
}

private void deployPerjob(YarnConfigOptions.UserJarInclusion userJarInclusion, JobGraph jobGraph) throws Exception {

Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());
protected void deployPerjob(Configuration configuration, JobGraph jobGraph) throws Exception {

try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

Expand Down Expand Up @@ -122,13 +121,15 @@ private void deployPerjob(YarnConfigOptions.UserJarInclusion userJarInclusion, J
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

extraVerification(configuration, applicationId);

waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

private JobGraph getTestingJobGraph() {
protected JobGraph getTestingJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

Expand All @@ -138,4 +139,15 @@ private JobGraph getTestingJobGraph() {

return env.getStreamGraph().getJobGraph();
}

protected Configuration createDefaultConfiguration(YarnConfigOptions.UserJarInclusion userJarInclusion) {
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());

return configuration;
}

protected void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
Expand Down Expand Up @@ -147,8 +148,14 @@ public abstract class YarnTestBase extends TestLogger {
@ClassRule
public static TemporaryFolder tmp = new TemporaryFolder();

// Temp directory for mini hdfs
@ClassRule
public static TemporaryFolder tmpHDFS = new TemporaryFolder();

protected static MiniYARNCluster yarnCluster = null;

protected static MiniDFSCluster miniDFSCluster = null;

/**
* Uberjar (fat jar) file of Flink.
*/
Expand All @@ -168,6 +175,7 @@ public abstract class YarnTestBase extends TestLogger {
protected static File flinkShadedHadoopDir;

protected static File yarnSiteXML = null;
protected static File hdfsSiteXML = null;

private YarnClient yarnClient = null;

Expand Down Expand Up @@ -377,6 +385,14 @@ public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFol
}
}

private static void writeHDFSSiteConfigXML(Configuration coreSite, File targetFolder) throws IOException {
hdfsSiteXML = new File(targetFolder, "/hdfs-site.xml");
try (FileWriter writer = new FileWriter(hdfsSiteXML)) {
coreSite.writeXml(writer);
writer.flush();
}
}

/**
* This method checks the written TaskManager and JobManager log files
* for exceptions.
Expand Down Expand Up @@ -593,14 +609,18 @@ protected ApplicationReport getOnlyApplicationReport() throws IOException, YarnE
}

public static void startYARNSecureMode(YarnConfiguration conf, String principal, String keytab) {
start(conf, principal, keytab);
start(conf, principal, keytab, false);
}

public static void startYARNWithConfig(YarnConfiguration conf) {
start(conf, null, null);
startYARNWithConfig(conf, false);
}

private static void start(YarnConfiguration conf, String principal, String keytab) {
public static void startYARNWithConfig(YarnConfiguration conf, boolean withDFS) {
start(conf, null, null, withDFS);
}

private static void start(YarnConfiguration conf, String principal, String keytab, boolean withDFS) {
// set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
File homeDir = null;
try {
Expand Down Expand Up @@ -667,6 +687,12 @@ private static void start(YarnConfiguration conf, String principal, String keyta

File targetTestClassesFolder = new File("target/test-classes");
writeYarnSiteConfigXML(conf, targetTestClassesFolder);

if (withDFS) {
LOG.info("Starting up MiniDFSCluster");
setMiniDFSCluster(targetTestClassesFolder);
}

map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos
map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
Expand All @@ -685,12 +711,28 @@ private static void start(YarnConfiguration conf, String principal, String keyta

}

private static void setMiniDFSCluster(File targetTestClassesFolder) throws Exception {
if (miniDFSCluster == null) {
Configuration hdfsConfiguration = new Configuration();
hdfsConfiguration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpHDFS.getRoot().getAbsolutePath());
miniDFSCluster = new MiniDFSCluster
.Builder(hdfsConfiguration)
.numDataNodes(2)
.build();
miniDFSCluster.waitClusterUp();

hdfsConfiguration = miniDFSCluster.getConfiguration(0);
writeHDFSSiteConfigXML(hdfsConfiguration, targetTestClassesFolder);
YARN_CONFIGURATION.addResource(hdfsConfiguration);
}
}

/**
* Default @BeforeClass impl. Overwrite this for passing a different configuration
*/
@BeforeClass
public static void setup() throws Exception {
startYARNWithConfig(YARN_CONFIGURATION);
startYARNWithConfig(YARN_CONFIGURATION, false);
}

// -------------------------- Runner -------------------------- //
Expand Down Expand Up @@ -963,6 +1005,12 @@ public static void teardown() throws Exception {
yarnCluster = null;
}

if (miniDFSCluster != null) {
LOG.info("Stopping MiniDFS Cluster");
miniDFSCluster.shutdown();
miniDFSCluster = null;
}

// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
Expand All @@ -979,6 +1027,10 @@ public static void teardown() throws Exception {
yarnSiteXML.delete();
}

if (hdfsSiteXML != null) {
hdfsSiteXML.delete();
}

// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
// to <flinkRoot>/target/flink-yarn-tests-*.
// The files from there are picked up by the ./tools/travis_watchdog.sh script
Expand Down
16 changes: 11 additions & 5 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public static void setupYarnClassPath(Configuration conf, Map<String, String> ap
* remote home directory base (will be extended)
* @param relativeTargetPath
* relative target path of the file (will be prefixed be the full home directory we set up)
* @param replication
* number of replications of a remote file to be created
*
* @return Path to remote file (usually hdfs)
*/
Expand All @@ -120,10 +122,11 @@ static Tuple2<Path, LocalResource> setupLocalResource(
String appId,
Path localSrcPath,
Path homedir,
String relativeTargetPath) throws IOException {
String relativeTargetPath,
int replication) throws IOException {

File localFile = new File(localSrcPath.toUri().getPath());
Tuple2<Path, Long> remoteFileInfo = uploadLocalFileToRemote(fs, appId, localSrcPath, homedir, relativeTargetPath);
Tuple2<Path, Long> remoteFileInfo = uploadLocalFileToRemote(fs, appId, localSrcPath, homedir, relativeTargetPath, replication);
// now create the resource instance
LocalResource resource = registerLocalResource(remoteFileInfo.f0, localFile.length(), remoteFileInfo.f1);
return Tuple2.of(remoteFileInfo.f0, resource);
Expand All @@ -142,6 +145,8 @@ static Tuple2<Path, LocalResource> setupLocalResource(
* remote home directory base (will be extended)
* @param relativeTargetPath
* relative target path of the file (will be prefixed be the full home directory we set up)
* @param replication
* number of replications of a remote file to be created
*
* @return Path to remote file (usually hdfs)
*/
Expand All @@ -150,7 +155,8 @@ static Tuple2<Path, Long> uploadLocalFileToRemote(
String appId,
Path localSrcPath,
Path homedir,
String relativeTargetPath) throws IOException {
String relativeTargetPath,
int replication) throws IOException {

File localFile = new File(localSrcPath.toUri().getPath());
if (localFile.isDirectory()) {
Expand All @@ -167,9 +173,9 @@ static Tuple2<Path, Long> uploadLocalFileToRemote(

Path dst = new Path(homedir, suffix);

LOG.debug("Copying from {} to {}", localSrcPath, dst);

LOG.debug("Copying from {} to {} with replication number {}", localSrcPath, dst, replication);
fs.copyFromLocalFile(false, true, localSrcPath, dst);
fs.setReplication(dst, (short) replication);

// Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote
// file once again which has problems with eventually consistent read-after-write file
Expand Down
Loading

0 comments on commit 9d91aac

Please sign in to comment.