Skip to content

Commit

Permalink
[FLINK-34083][yarn][refactor] Move BootstrapTools#getTaskManagerShell…
Browse files Browse the repository at this point in the history
…Command to flink-yarn Utils
  • Loading branch information
Sxnan authored and 1996fanrui committed Jan 19, 2024
1 parent 0b4c6f9 commit 752d3a7
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.OperatingSystem;

import org.apache.flink.shaded.guava32.com.google.common.escape.Escaper;
Expand All @@ -42,7 +40,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

Expand Down Expand Up @@ -160,128 +157,11 @@ public static Configuration parseDynamicProperties(CommandLine cmd) {
return config;
}

/**
* Generates the shell command to start a task manager.
*
* @param flinkConfig The Flink configuration.
* @param tmParams Parameters for the task manager.
* @param configDirectory The configuration directory for the flink-conf.yaml
* @param logDirectory The log directory.
* @param hasLogback Uses logback?
* @param hasLog4j Uses log4j?
* @param mainClass The main class to start with.
* @return A String containing the task manager startup command.
*/
public static String getTaskManagerShellCommand(
Configuration flinkConfig,
ContaineredTaskManagerParameters tmParams,
String configDirectory,
String logDirectory,
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
Class<?> mainClass,
String mainArgs) {

final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java", "$JAVA_HOME/bin/java");

final TaskExecutorProcessSpec taskExecutorProcessSpec =
tmParams.getTaskExecutorProcessSpec();
startCommandValues.put(
"jvmmem", ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec));

String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
}
javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;

// krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
javaOpts += " -Djava.security.krb5.conf=krb5.conf";
}
startCommandValues.put("jvmopts", javaOpts);

String logging = "";
if (hasLogback || hasLog4j) {
logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
if (hasLogback) {
logging += " -Dlogback.configurationFile=file:" + configDirectory + "/logback.xml";
}
if (hasLog4j) {
logging += " -Dlog4j.configuration=file:" + configDirectory + "/log4j.properties";
logging +=
" -Dlog4j.configurationFile=file:" + configDirectory + "/log4j.properties";
}
}

startCommandValues.put("logging", logging);
startCommandValues.put("class", mainClass.getName());
startCommandValues.put(
"redirects",
"1> "
+ logDirectory
+ "/taskmanager.out "
+ "2> "
+ logDirectory
+ "/taskmanager.err");

String argsStr =
TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)
+ " --configDir "
+ configDirectory;
if (!mainArgs.isEmpty()) {
argsStr += " " + mainArgs;
}
startCommandValues.put("args", argsStr);

final String commandTemplate =
flinkConfig.getString(
ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
String startCommand = getStartCommand(commandTemplate, startCommandValues);
LOG.debug("TaskManager start command: " + startCommand);

return startCommand;
}

// ------------------------------------------------------------------------

/** Private constructor to prevent instantiation. */
private BootstrapTools() {}

/**
* Replaces placeholders in the template start command with values from startCommandValues.
*
* <p>If the default template {@link
* ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE} is used, the following keys
* must be present in the map or the resulting command will still contain placeholders:
*
* <ul>
* <li><tt>java</tt> = path to the Java executable
* <li><tt>jvmmem</tt> = JVM memory limits and tweaks
* <li><tt>jvmopts</tt> = misc options for the Java VM
* <li><tt>logging</tt> = logging-related configuration settings
* <li><tt>class</tt> = main class to execute
* <li><tt>args</tt> = arguments for the main class
* <li><tt>redirects</tt> = output redirects
* </ul>
*
* @param template a template start command with placeholders
* @param startCommandValues a replacement map <tt>placeholder -&gt; value</tt>
* @return the start command with placeholders filled in
*/
public static String getStartCommand(String template, Map<String, String> startCommandValues) {
for (Map.Entry<String, String> variable : startCommandValues.entrySet()) {
template =
template.replace("%" + variable.getKey() + "%", variable.getValue())
.replace(" ", " ")
.trim();
}
return template;
}

/**
* Set temporary configuration directories if necessary.
*
Expand Down
Loading

0 comments on commit 752d3a7

Please sign in to comment.