Skip to content

Commit

Permalink
[FLINK-4040] [dist] Allow JobManager/TaskManager-specific JVM args
Browse files Browse the repository at this point in the history
- Adds two new config keys for the start up scripts:
    * env.java.opts.jobmanager
    * env.java.opts.taskmanager
- These are used in addition to the regular env.java.opts for the JobManager
  and TaskManager, respectively.
- Current behaviour is not changed. This does not address the JMX port
  setting, because that is handled differently.

This closes apache#2143.
  • Loading branch information
uce committed Jun 23, 2016
1 parent 479be14 commit b6e6b81
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 2 deletions.
6 changes: 5 additions & 1 deletion docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ The configuration files for the TaskManagers can be different, Flink does not as

- `env.java.home`: The path to the Java installation to use (DEFAULT: system's default Java installation, if found). Needs to be specified if the startup scripts fail to automatically resolve the java home directory. Can be specified to point to a specific java installation or version. If this option is not specified, the startup scripts also evaluate the `$JAVA_HOME` environment variable.

- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services.
- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` for JobManager or TaskManager-specific options, respectively.

- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`.

- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`.

- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).

Expand Down
16 changes: 16 additions & 0 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ readFromConfig() {
DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode

########################################################################################################################
Expand All @@ -99,6 +101,8 @@ KEY_ENV_LOG_DIR="env.log.dir"
KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_RECOVERY_MODE="recovery.mode"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
Expand Down Expand Up @@ -229,6 +233,18 @@ if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
fi

if [ -z "${FLINK_SSH_OPTS}" ]; then
FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
fi
Expand Down
3 changes: 3 additions & 0 deletions flink-dist/src/main/flink-bin/bin/jobmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ if [[ $STARTSTOP == "start" ]]; then
export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
fi

# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"

# Startup parameters
args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "${EXECUTIONMODE}")
if [ ! -z $HOST ]; then
Expand Down
5 changes: 4 additions & 1 deletion flink-dist/src/main/flink-bin/bin/taskmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if [[ $STARTSTOP == "start" ]]; then

# if memory allocation mode is lazy and no other JVM options are set,
# set the 'Concurrent Mark Sweep GC'
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then

JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

Expand Down Expand Up @@ -89,6 +89,9 @@ if [[ $STARTSTOP == "start" ]]; then

fi

# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

# Startup parameters
args=("--configDir" "${FLINK_CONF_DIR}")
fi
Expand Down

0 comments on commit b6e6b81

Please sign in to comment.