Skip to content

Commit

Permalink
[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS…
Browse files Browse the repository at this point in the history
… paths.

This closes apache#3202
  • Loading branch information
Haohui Mai authored and StephanEwen committed Feb 13, 2017
1 parent 30c5b77 commit 186b123
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 445 deletions.
223 changes: 222 additions & 1 deletion flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +58,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;

import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;

/**
* Utility class that provides helper methods to work with Apache Hadoop YARN.
*/
Expand Down Expand Up @@ -107,7 +114,7 @@ public static void setupYarnClassPath(Configuration conf, Map<String, String> ap
addToEnvironment(
appMasterEnv,
Environment.CLASSPATH.name(),
appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
appMasterEnv.get(ENV_FLINK_CLASSPATH));
String[] applicationClassPathEntries = conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
Expand Down Expand Up @@ -264,4 +271,218 @@ public static Map<String, String> getEnvironmentVariables(String envPrefix, org.
}
return result;
}

/**
* Creates the launch context, which describes how to bring up a TaskExecutor / TaskManager process in
* an allocated YARN container.
*
* <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
* needs (such as JAR file, config file, ...) and all environment variables in a YARN
* container launch context. The launch context then ensures that those resources will be
* copied into the containers transient working directory.
*
* @param flinkConfig
* The Flink configuration object.
* @param yarnConfig
* The YARN configuration object.
* @param env
* The environment variables.
* @param tmParams
* The TaskExecutor container memory parameters.
* @param taskManagerConfig
* The configuration for the TaskExecutors.
* @param workingDirectory
* The current application master container's working directory.
* @param taskManagerMainClass
* The class with the main method.
* @param log
* The logger.
*
* @return The launch context for the TaskManager processes.
*
* @throws Exception Thrown if teh launch context could not be created, for example if
* the resources could not be copied.
*/
static ContainerLaunchContext createTaskExecutorContext(
org.apache.flink.configuration.Configuration flinkConfig,
YarnConfiguration yarnConfig,
Map<String, String> env,
ContaineredTaskManagerParameters tmParams,
org.apache.flink.configuration.Configuration taskManagerConfig,
String workingDirectory,
Class<?> taskManagerMainClass,
Logger log) throws Exception {

// get and validate all relevant variables

String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);

String appId = env.get(YarnConfigKeys.ENV_APP_ID);
require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);

String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);

String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);

String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);

final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
log.info("TM:remote keytab path obtained {}", remoteKeytabPath);

final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);

final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);

final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);

String classPathString = env.get(ENV_FLINK_CLASSPATH);
require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);

//register keytab
LocalResource keytabResource = null;
if(remoteKeytabPath != null) {
log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(remoteKeytabPath);
FileSystem fs = keytabPath.getFileSystem(yarnConfig);
registerLocalResource(fs, keytabPath, keytabResource);
}

//To support Yarn Secure Integration Test Scenario
LocalResource yarnConfResource = null;
LocalResource krb5ConfResource = null;
boolean hasKrb5 = false;
if(remoteYarnConfPath != null && remoteKrb5Path != null) {
log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnConfPath = new Path(remoteYarnConfPath);
FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
registerLocalResource(fs, yarnConfPath, yarnConfResource);

log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
krb5ConfResource = Records.newRecord(LocalResource.class);
Path krb5ConfPath = new Path(remoteKrb5Path);
fs = krb5ConfPath.getFileSystem(yarnConfig);
registerLocalResource(fs, krb5ConfPath, krb5ConfResource);

hasKrb5 = true;
}

// register Flink Jar with remote HDFS
LocalResource flinkJar = Records.newRecord(LocalResource.class);
{
Path remoteJarPath = new Path(remoteFlinkJarPath);
FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
registerLocalResource(fs, remoteJarPath, flinkJar);
}

// register conf with local fs
LocalResource flinkConf = Records.newRecord(LocalResource.class);
{
// write the TaskManager configuration to a local file
final File taskManagerConfigFile =
new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);

Path homeDirPath = new Path(clientHomeDir);
FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
setupLocalResource(fs, appId,
new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));

log.info("Prepared local resource for modified yaml: {}", flinkConf);
}

Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
taskManagerLocalResources.put("flink.jar", flinkJar);
taskManagerLocalResources.put("flink-conf.yaml", flinkConf);

//To support Yarn Secure Integration Test Scenario
if(yarnConfResource != null && krb5ConfResource != null) {
taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
}

if(keytabResource != null) {
taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
}

// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
LocalResource resource = Records.newRecord(LocalResource.class);
Path path = new Path(pathStr);
registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
taskManagerLocalResources.put(path.getName(), resource);
}
}

// now that all resources are prepared, we can create the launch context

log.info("Creating container launch context for TaskManagers");

boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();

String launchCommand = BootstrapTools.getTaskManagerShellCommand(
flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);

log.info("Starting TaskManagers with command: " + launchCommand);

ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(Collections.singletonList(launchCommand));
ctx.setLocalResources(taskManagerLocalResources);

Map<String, String> containerEnv = new HashMap<>();
containerEnv.putAll(tmParams.taskManagerEnv());

// add YARN classpath, etc to the container environment
containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
setupYarnClassPath(yarnConfig, containerEnv);

containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());

if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
}

ctx.setEnvironment(containerEnv);

try (DataOutputBuffer dob = new DataOutputBuffer()) {
log.debug("Adding security tokens to Task Executor Container launch Context....");
UserGroupInformation user = UserGroupInformation.getCurrentUser();
Credentials credentials = user.getCredentials();
credentials.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ctx.setTokens(securityTokens);
}
catch (Throwable t) {
log.error("Getting current user info failed when trying to launch the container", t);
}

return ctx;
}

/**
* Validates a condition, throwing a RuntimeException if the condition is violated.
*
* @param condition The condition.
* @param message The message for the runtime exception, with format variables as defined by
* {@link String#format(String, Object...)}.
* @param values The format arguments.
*/
static void require(boolean condition, String message, Object... values) {
if (!condition) {
throw new RuntimeException(String.format(message, values));
}
}
}
Loading

0 comments on commit 186b123

Please sign in to comment.