Skip to content

Commit

Permalink
[FLINK-33721][core] Extend BashJavaUtils to support reading and writi…
Browse files Browse the repository at this point in the history
…ng standard yaml file.

This closes apache#24091.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Jan 23, 2024
1 parent 1f7622d commit c148b62
Show file tree
Hide file tree
Showing 14 changed files with 945 additions and 150 deletions.
170 changes: 170 additions & 0 deletions flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

UNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
JAVA_RUN=java
else
if [[ -d "$JAVA_HOME" ]]; then
JAVA_RUN="$JAVA_HOME"/bin/java
else
JAVA_RUN=java
fi
fi

manglePathList() {
UNAME=$(uname -s)
# a path list, for example a java classpath
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
echo `cygpath -wp "$1"`
else
echo $1
fi
}

findFlinkDistJar() {
local FLINK_DIST
local LIB_DIR
if [[ -n "$1" ]]; then
LIB_DIR="$1"
else
LIB_DIR="$FLINK_LIB_DIR"
fi
FLINK_DIST="$(find "$LIB_DIR" -name 'flink-dist*.jar')"
local FLINK_DIST_COUNT
FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)"

# If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored
# as the classpath and exit function with empty classpath to force process failure
if [[ "$FLINK_DIST" == "" ]]; then
(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
exit 1
elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then
(>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.")
exit 1
fi

echo "$FLINK_DIST"
}

runBashJavaUtilsCmd() {
local cmd=$1
local conf_dir=$2
local class_path=$3
local dynamic_args=${@:4}
class_path=`manglePathList "${class_path}"`

local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
if [[ $? -ne 0 ]]; then
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
# Print the output in case the user redirect the log to console.
echo "$output" 1>&2
exit 1
fi

echo "$output"
}

updateAndGetFlinkConfiguration() {
local FLINK_CONF_DIR="$1"
local FLINK_BIN_DIR="$2"
local FLINK_LIB_DIR="$3"
local command_result
command_result=$(parseConfigurationAndExportLogs "$FLINK_CONF_DIR" "$FLINK_BIN_DIR" "$FLINK_LIB_DIR" "UPDATE_AND_GET_FLINK_CONFIGURATION" "${@:4}")
echo "$command_result"
}

parseConfigurationAndExportLogs() {
local FLINK_CONF_DIR="$1"
local FLINK_BIN_DIR="$2"
local FLINK_LIB_DIR="$3"
local COMMAND="$4"
local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"

java_utils_output=$(runBashJavaUtilsCmd "${COMMAND}" "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar ${FLINK_LIB_DIR})" "${@:5}")
logging_output=$(extractLoggingOutputs "${java_utils_output}")
execution_results=$(echo "${java_utils_output}" | grep ${EXECUTION_PREFIX})

if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not parse configurations properly."
echo "[ERROR] Raw output from BashJavaUtils:"
echo "$java_utils_output"
exit 1
fi

echo "${execution_results//${EXECUTION_PREFIX}/}"
}

extractLoggingOutputs() {
local output="$1"
local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"

echo "${output}" | grep -v ${EXECUTION_PREFIX}
}

extractExecutionResults() {
local output="$1"
local expected_lines="$2"
local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
local execution_results
local num_lines

execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
num_lines=$(echo "${execution_results}" | wc -l)
# explicit check for empty result, because if execution_results is empty, then wc returns 1
if [[ -z ${execution_results} ]]; then
echo "[ERROR] The execution result is empty." 1>&2
exit 1
fi
if [[ ${num_lines} -ne ${expected_lines} ]]; then
echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
echo "$output" 1>&2
exit 1
fi

echo "${execution_results//${EXECUTION_PREFIX}/}"
}

parseResourceParamsAndExportLogs() {
local cmd=$1
java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}")
logging_output=$(extractLoggingOutputs "${java_utils_output}")
params_output=$(extractExecutionResults "${java_utils_output}" 2)

if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
echo "[ERROR] Raw output from BashJavaUtils:"
echo "$java_utils_output"
exit 1
fi

jvm_params=$(echo "${params_output}" | head -n1)
export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)

export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGS
RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
dynamic_configs: $DYNAMIC_PARAMETERS
logs: $logging_output
"
}
44 changes: 44 additions & 0 deletions flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

USAGE="Usage: config-parser-utils.sh FLINK_CONF_DIR FLINK_BIN_DIR FLINK_LIB_DIR [dynamic args...]"

if [ "$#" -lt 3 ]; then
echo "$USAGE"
exit 1
fi

source "$2"/bash-java-utils.sh

ARGS=("${@:1}")
result=$(updateAndGetFlinkConfiguration "${ARGS[@]}")

if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get configurations properly, the result is :"
echo "$result"
exit 1
fi

CONF_FILE="$1/flink-conf.yaml"
if [ ! -e "$1/flink-conf.yaml" ]; then
CONF_FILE="$1/config.yaml"
fi;

# Output the result
echo "${result}" > "$CONF_FILE";
Loading

0 comments on commit c148b62

Please sign in to comment.