Skip to content

Commit

Permalink
[FLINK-3675][yarn] improvements to library shipping
Browse files Browse the repository at this point in the history
- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case

This closes apache#2187
  • Loading branch information
mxm committed Jul 1, 2016
1 parent 0e8be41 commit 0483ba5
Show file tree
Hide file tree
Showing 17 changed files with 352 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public class CliFrontend {
private static final String ACTION_SAVEPOINT = "savepoint";

// config dir parameters
public static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";

Expand Down Expand Up @@ -153,7 +152,7 @@ public CliFrontend(String configDir) throws Exception {
// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();

try {
Expand Down Expand Up @@ -1022,16 +1021,16 @@ public static void main(String[] args) {
// --------------------------------------------------------------------------------------------

public static String getConfigurationDirectoryFromEnv() {
String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);
String envLocation = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
String location = envLocation != null ? envLocation : System.getProperty(ConfigConstants.ENV_FLINK_CONF_DIR);

if (location != null) {
if (new File(location).exists()) {
return location;
}
else {
throw new RuntimeException("The config directory '" + location + "', specified in the '" +
ENV_CONFIG_DIRECTORY + "' environment variable, does not exist.");
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
}
}
else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
Expand All @@ -1043,7 +1042,7 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
else {
throw new RuntimeException("The configuration directory was not specified. " +
"Please specify the directory containing the configuration file through the '" +
ENV_CONFIG_DIRECTORY + "' environment variable.");
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
}
return location;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,15 @@ public final class ConfigConstants {
/** ZooKeeper default leader port. */
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;

// ----------------------------- Environment Variables ----------------------------

/** The environment variable name which contains the location of the configuration directory */
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";

/** The environment variable name which contains the location of the lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";


/**
* Not instantiable.
*/
Expand Down
5 changes: 5 additions & 0 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
FLINK_ROOT_DIR=`dirname "$SYMLINK_RESOLVED_BIN"`
FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib

### Exported environment variables ###
export FLINK_CONF_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR

# These need to be mangled because they are directly passed to java.
# The above lib path is used by the shell script to retrieve jars in a
# directory, so it needs to be unmangled.
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@"
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar "$@"

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void write(int b) {}

// Unset FLINK_CONF_DIR, as this is a precondition for this test to work properly
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
TestBaseUtils.setEnv(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
Expand Down Expand Up @@ -61,14 +61,14 @@ public void testDynamicProperties() throws IOException {
File tmpFolder = tmp.newFolder();
File fakeConf = new File(tmpFolder, "flink-conf.yaml");
fakeConf.createNewFile();
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
Options options = new Options();
cli.addGeneralOptions(options);
cli.addRunOptions(options);

CommandLineParser parser = new PosixParser();
CommandLineParser parser = new DefaultParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public TestingYarnClusterDescriptor() {
filesToShip.add(testingJar);
filesToShip.add(testingRuntimeJar);

setShipFiles(filesToShip);
addShipFiles(filesToShip);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void testMultipleAMKill() throws Exception {
flinkYarnClient.setJobManagerMemory(768);
flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));

String confDirPath = System.getenv("FLINK_CONF_DIR");
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
flinkYarnClient.setConfigurationDirectory(confDirPath);

String fsStateHandlePath = tmp.getRoot().getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.yarn;

import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
Expand Down Expand Up @@ -221,8 +222,8 @@ public void testJavaAPI() {
flinkYarnClient.setJobManagerMemory(768);
flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
flinkYarnClient.setConfigurationDirectory(confDirPath);
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class YarnClusterDescriptorTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}
*/
@Test
public void testExplicitLibShipping() throws Exception {
AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));

descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
descriptor.setFlinkConfiguration(new Configuration());

File libFile = temporaryFolder.newFile("libFile.jar");
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();

Assert.assertFalse(descriptor.shipFiles.contains(libFile));
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));

List<File> shipFiles = new ArrayList<>();
shipFiles.add(libFile);
shipFiles.add(libFolder);

descriptor.addShipFiles(shipFiles);

Assert.assertTrue(descriptor.shipFiles.contains(libFile));
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));

// only execute part of the deployment to test for shipped files
Set<File> effectiveShipFiles = new HashSet<>();
descriptor.addLibFolderToShipFiles(effectiveShipFiles);

Assert.assertEquals(0, effectiveShipFiles.size());
Assert.assertEquals(2, descriptor.shipFiles.size());
Assert.assertTrue(descriptor.shipFiles.contains(libFile));
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
}

/**
* Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}
*/
@Test
public void testEnvironmentLibShipping() throws Exception {
AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();

descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
descriptor.setFlinkConfiguration(new Configuration());

File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
File libFile = new File(libFolder, "libFile.jar");
libFile.createNewFile();

Set<File> effectiveShipFiles = new HashSet<>();

final Map<String, String> oldEnv = System.getenv();
try {
Map<String, String> env = new HashMap<>(1);
env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
TestBaseUtils.setEnv(env);
// only execute part of the deployment to test for shipped files
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
} finally {
TestBaseUtils.setEnv(oldEnv);
}

// only add the ship the folder, not the contents
Assert.assertFalse(effectiveShipFiles.contains(libFile));
Assert.assertTrue(effectiveShipFiles.contains(libFolder));
Assert.assertFalse(descriptor.shipFiles.contains(libFile));
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -365,7 +366,7 @@ public static void startYARNWithConfig(Configuration conf) {
File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
Assert.assertNotNull(flinkConfDirPath);

map.put(CliFrontend.ENV_CONFIG_DIRECTORY, flinkConfDirPath.getParent());
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());

File yarnConfFile = writeYarnSiteConfigXML(conf);
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
Expand Down Expand Up @@ -596,7 +597,7 @@ public void sendStop() {
public static void teardown() throws Exception {
// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
TestBaseUtils.setEnv(map);

// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
Expand Down
Loading

0 comments on commit 0483ba5

Please sign in to comment.