Skip to content

Commit

Permalink
[FLINK-4326] [scripts] Flink foreground services
Browse files Browse the repository at this point in the history
Add a "start-foreground" option to the Flink service scripts which does
not daemonize the service nor redirect output.

This closes apache#3492.
This closes apache#3351.
  • Loading branch information
greghogan authored and uce committed Mar 8, 2017
1 parent e9a5c86 commit 338c30a
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 28 deletions.
6 changes: 3 additions & 3 deletions docs/setup/cluster_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ To stop Flink, there is also a `stop-cluster.sh` script.

### Adding JobManager/TaskManager Instances to a Cluster

You can add both JobManager and TaskManager instances to your running cluster with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.
You can add both JobManager and TaskManager instances to your running cluster with the `bin/jobmanager.sh` and `bin/taskmanager.sh` scripts.

#### Adding a JobManager

~~~bash
bin/jobmanager.sh (start cluster)|stop|stop-all
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
~~~
#### Adding a TaskManager
~~~bash
bin/taskmanager.sh start|stop|stop-all
bin/taskmanager.sh start|start-foreground|stop|stop-all
~~~
Make sure to call these scripts on the hosts on which you want to start/stop the respective instance.
Expand Down
65 changes: 65 additions & 0 deletions flink-dist/src/main/flink-bin/bin/flink-console.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (jobmanager|taskmanager|zookeeper) [args]"

SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

case $SERVICE in
(jobmanager)
CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
;;

(taskmanager)
CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
;;

(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;

(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
;;
esac

FLINK_TM_CLASSPATH=`constructFlinkClassPath`

log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")

JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi

echo "Starting $SERVICE as a console application on host $HOSTNAME."
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
10 changes: 7 additions & 3 deletions flink-dist/src/main/flink-bin/bin/jobmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
################################################################################

# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)"
USAGE="Usage: jobmanager.sh ((start|start-foreground) (local|cluster) [host] [webui-port])|stop|stop-all"

STARTSTOP=$1
EXECUTIONMODE=$2
Expand All @@ -30,7 +30,7 @@ bin=`cd "$bin"; pwd`

. "$bin"/config.sh

if [[ $STARTSTOP == "start" ]]; then
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
if [ -z $EXECUTIONMODE ]; then
echo "Missing execution mode (local|cluster) argument. $USAGE."
exit 1
Expand Down Expand Up @@ -70,4 +70,8 @@ if [[ $STARTSTOP == "start" ]]; then
fi
fi

"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}"
if [[ $STARTSTOP == "start-foreground" ]]; then
"${FLINK_BIN_DIR}"/flink-console.sh jobmanager "${args[@]}"
else
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP jobmanager "${args[@]}"
fi
42 changes: 23 additions & 19 deletions flink-dist/src/main/flink-bin/bin/taskmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
################################################################################

# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|stop|stop-all)"
USAGE="Usage: taskmanager.sh start|start-foreground|stop|stop-all)"

STARTSTOP=$1

Expand All @@ -27,7 +27,7 @@ bin=`cd "$bin"; pwd`

. "$bin"/config.sh

if [[ $STARTSTOP == "start" ]]; then
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

# if memory allocation mode is lazy and no other JVM options are set,
# set the 'Concurrent Mark Sweep GC'
Expand Down Expand Up @@ -96,22 +96,26 @@ if [[ $STARTSTOP == "start" ]]; then
args=("--configDir" "${FLINK_CONF_DIR}")
fi

TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}"

if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
$TM_COMMAND
if [[ $STARTSTOP == "start-foreground" ]]; then
"${FLINK_BIN_DIR}"/flink-console.sh taskmanager "${args[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND
done
TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}"

if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
$TM_COMMAND
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND
done
fi
fi
10 changes: 7 additions & 3 deletions flink-dist/src/main/flink-bin/bin/zookeeper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
################################################################################

# Start/stop a ZooKeeper quorum peer.
USAGE="Usage: zookeeper.sh (start peer-id|stop|stop-all)"
USAGE="Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all"

STARTSTOP=$1
PEER_ID=$2
Expand All @@ -34,7 +34,7 @@ if [ ! -f $ZK_CONF ]; then
exit 1
fi

if [[ $STARTSTOP == "start" ]]; then
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
if [ -z $PEER_ID ]; then
echo "[ERROR] Missing peer id argument. $USAGE."
exit 1
Expand All @@ -53,4 +53,8 @@ if [[ $STARTSTOP == "start" ]]; then
args=("--zkConfigFile" "${ZK_CONF}" "--peerId" "${PEER_ID}")
fi

"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}"
if [[ $STARTSTOP == "start-foreground" ]]; then
"${FLINK_BIN_DIR}"/flink-console.sh zookeeper "${args[@]}"
else
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}"
fi
39 changes: 39 additions & 0 deletions flink-dist/src/main/flink-bin/conf/log4j-console.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
56 changes: 56 additions & 0 deletions flink-dist/src/main/flink-bin/conf/logback-console.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>

<!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="console"/>
</root>

<!-- Uncomment this if you want to only change Flink's logging -->
<!--<logger name="org.apache.flink" level="INFO">-->
<!--<appender-ref ref="console"/>-->
<!--</logger>-->

<!-- The following lines keep the log level of common libraries/connectors on
log level INFO. The root logger does not override this. You have to manually
change the log levels here. -->
<logger name="akka" level="INFO">
<appender-ref ref="console"/>
</logger>
<logger name="org.apache.kafka" level="INFO">
<appender-ref ref="console"/>
</logger>
<logger name="org.apache.hadoop" level="INFO">
<appender-ref ref="console"/>
</logger>
<logger name="org.apache.zookeeper" level="INFO">
<appender-ref ref="console"/>
</logger>

<!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
<logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
<appender-ref ref="console"/>
</logger>
</configuration>

0 comments on commit 338c30a

Please sign in to comment.