Skip to content

Commit

Permalink
[FLINK-29704][runtime][security] E2E test for delegation token framework
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi authored Nov 16, 2022
1 parent 39f0e9b commit 4a0f283
Showing 5 changed files with 161 additions and 101 deletions.
191 changes: 115 additions & 76 deletions flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
Original file line number Diff line number Diff line change
@@ -22,91 +22,95 @@ source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_docker.sh
source "$(dirname "$0")"/common_artifact_download_cacher.sh

FLINK_TARBALL_DIR=$TEST_DATA_DIR
FLINK_TARBALL_DIR=${TEST_DATA_DIR}
FLINK_TARBALL=flink.tar.gz
FLINK_DIRNAME=$(basename $FLINK_DIR)
FLINK_DIRNAME=$(basename "${FLINK_DIR}")

MAX_RETRY_SECONDS=120
CLUSTER_SETUP_RETRIES=3
IMAGE_BUILD_RETRIES=5

echo "Flink Tarball directory $FLINK_TARBALL_DIR"
echo "Flink tarball filename $FLINK_TARBALL"
echo "Flink distribution directory name $FLINK_DIRNAME"
echo "End-to-end directory $END_TO_END_DIR"
echo "Flink Tarball directory ${FLINK_TARBALL_DIR}"
echo "Flink tarball filename ${FLINK_TARBALL}"
echo "Flink distribution directory name ${FLINK_DIRNAME}"
echo "End-to-end directory ${END_TO_END_DIR}"

start_time=$(date +%s)

# make sure we stop our cluster at the end
# Make sure we stop our cluster at the end
function cluster_shutdown {
if [ $TRAPPED_EXIT_CODE != 0 ];then
if [ ${TRAPPED_EXIT_CODE} != 0 ];then
debug_copy_and_show_logs
fi
docker-compose -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml down
rm $FLINK_TARBALL_DIR/$FLINK_TARBALL
docker-compose -f "${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml" down
rm "${FLINK_TARBALL_DIR}/${FLINK_TARBALL}"
}
on_exit cluster_shutdown

function start_hadoop_cluster() {
echo "Starting Hadoop cluster"
docker-compose -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up -d
docker-compose -f "${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml" up -d

# wait for kerberos to be set up
# Wait for kerberos to be set up
local start_time
start_time=$(date +%s)
until docker logs master 2>&1 | grep -q "Finished master initialization"; do
local current_time
current_time=$(date +%s)
time_diff=$((current_time - start_time))
local time_diff=$((current_time - start_time))

if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
if [ ${time_diff} -ge ${MAX_RETRY_SECONDS} ]; then
return 1
else
echo "Waiting for hadoop cluster to come up. We have been trying for $time_diff seconds, retrying ..."
echo "Waiting for hadoop cluster to come up. We have been trying for ${time_diff} seconds, retrying ..."
sleep 5
fi
done

# perform health checks
# Perform health checks
containers_health_check "master" "worker1" "worker2" "kdc"

# try and see if NodeManagers are up, otherwise the Flink job will not have enough resources
# to run
nm_running="0"
# Try and see if NodeManagers are up, otherwise the Flink job will not have enough resources to run
local nm_running="0"
local start_time
start_time=$(date +%s)
while [ "$nm_running" -lt "2" ]; do
while [ "${nm_running}" -lt "2" ]; do
local current_time
current_time=$(date +%s)
time_diff=$((current_time - start_time))
local time_diff=$((current_time - start_time))

if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then
if [ ${time_diff} -ge ${MAX_RETRY_SECONDS} ]; then
return 1
else
echo "We only have $nm_running NodeManagers up. We have been trying for $time_diff seconds, retrying ..."
echo "We only have ${nm_running} NodeManagers up. We have been trying for ${time_diff} seconds, retrying ..."
sleep 1
fi

docker exec master bash -c "kinit -kt /home/hadoop-user/hadoop-user.keytab hadoop-user"
nm_running=`docker exec master bash -c "yarn node -list" | grep RUNNING | wc -l`
docker exec master bash -c "kdestroy"
docker_kinit master
nm_running=$(docker exec master bash -c "yarn node -list" | grep -c RUNNING)
docker_kdestroy master
done

echo "We now have $nm_running NodeManagers up."
echo "We now have ${nm_running} NodeManagers up."

return 0
}

function build_image() {
echo "Predownloading Hadoop tarball"
echo "Pre-downloading Hadoop tarball"
local cache_path
cache_path=$(get_artifact "http://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz")
ln "$cache_path" "$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/hadoop-2.8.5.tar.gz"
ln "${cache_path}" "${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/hadoop-2.8.5.tar.gz"

echo "Building Hadoop Docker container"
docker build --build-arg HADOOP_VERSION=2.8.5 \
-f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile \
-f "${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/Dockerfile" \
-t flink/docker-hadoop-secure-cluster:latest \
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/
"${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/"
}

function start_hadoop_cluster_and_prepare_flink() {
if ! retry_times $IMAGE_BUILD_RETRIES 2 build_image; then
if ! retry_times ${IMAGE_BUILD_RETRIES} 2 build_image; then
echo "ERROR: Could not build hadoop image. Aborting..."
exit 1
fi
@@ -115,50 +119,47 @@ function start_hadoop_cluster_and_prepare_flink() {
exit 1
fi

mkdir -p $FLINK_TARBALL_DIR
tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
mkdir -p "${FLINK_TARBALL_DIR}"
tar czf "${FLINK_TARBALL_DIR}/${FLINK_TARBALL}" -C "$(dirname "${FLINK_DIR}")" .

docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/
docker cp "${FLINK_TARBALL_DIR}/${FLINK_TARBALL}" master:/home/hadoop-user/

# now, at least the container is ready
docker exec master bash -c "tar xzf /home/hadoop-user/$FLINK_TARBALL --directory /home/hadoop-user/"
# Now, at least the container is ready
docker exec master bash -c "tar xzf /home/hadoop-user/${FLINK_TARBALL} --directory /home/hadoop-user/"

# minimal Flink config, bebe
# Minimal Flink config, bebe
FLINK_CONFIG=$(cat << END
security.kerberos.login.keytab: /home/hadoop-user/hadoop-user.keytab
security.kerberos.login.principal: hadoop-user
slot.request.timeout: 120000
END
)
docker exec master bash -c "echo \"$FLINK_CONFIG\" > /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
docker exec master bash -c "echo \"${FLINK_CONFIG}\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"

echo "Flink config:"
docker exec master bash -c "cat /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
docker exec master bash -c "cat /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
}

function debug_copy_and_show_logs {
echo "Debugging failed YARN Docker test:"
echo -e "\nCurrently running containers"
docker ps

echo -e "\n\nCurrently running JVMs"
echo -e "\nCurrently running JVMs"
jps -v

local log_directory="$TEST_DATA_DIR/logs"
local yarn_docker_containers="master $(docker ps --format '{{.Names}}' | grep worker)"

extract_hadoop_logs ${log_directory} ${yarn_docker_containers}
print_logs ${log_directory}
local log_directory="${TEST_DATA_DIR}/logs"
local yarn_docker_containers
yarn_docker_containers="master $(docker ps --format '{{.Names}}' | grep worker)"

echo -e "\n\n ==== Flink logs ===="
docker exec master bash -c "kinit -kt /home/hadoop-user/hadoop-user.keytab hadoop-user"
docker exec master bash -c "yarn application -list -appStates ALL"
application_id=`docker exec master bash -c "yarn application -list -appStates ALL" | grep -i "Flink" | grep -i "cluster" | awk '{print \$1}'`
extract_hadoop_logs "${log_directory}" "${yarn_docker_containers}"
print_logs "${log_directory}"

echo -e "\n\nApplication ID: '$application_id'"
docker exec master bash -c "yarn logs -applicationId $application_id"

docker exec master bash -c "kdestroy"
local yarn_application_logs
yarn_application_logs=$(get_yarn_application_logs)
echo -e "\n==== YARN application logs begin ===="
echo "${yarn_application_logs}"
echo -e "\n==== YARN application logs end ====\n"
}

function extract_hadoop_logs() {
@@ -182,55 +183,93 @@ function print_logs() {
local parent_folder="$1"

ls -lisahR "${parent_folder}"
find "${parent_folder}" -type f -exec echo -e "\n\nContent of {}:" \; -exec cat {} \;
find "${parent_folder}" -type f -exec echo -e "\nContent of {}:" \; -exec cat {} \;
}

# expects only one application to be running and waits until this one is in
# Expects only one application to be running and waits until this one is in
# final state SUCCEEDED
function wait_for_single_yarn_application {

docker exec master bash -c "kinit -kt /home/hadoop-user/hadoop-user.keytab hadoop-user"

# find our application ID
docker exec master bash -c "yarn application -list -appStates ALL"
application_id=$(docker exec master bash -c "yarn application -list -appStates ALL" | grep "Flink Application" | awk '{print $1}')
application_id=$(get_yarn_application_id)

echo "Application ID: $application_id"
docker_kinit master

# wait for the application to finish successfully
# Wait for the application to finish successfully
start_time=$(date +%s)
application_state="UNDEFINED"
while [[ $application_state != "FINISHED" ]]; do
while [[ ${application_state} != "FINISHED" ]]; do
current_time=$(date +%s)
time_diff=$((current_time - start_time))

if [[ $time_diff -ge $MAX_RETRY_SECONDS ]]; then
echo "Application $application_id is in state $application_state and we have waited too long, quitting..."
echo "Application ${application_id} is in state ${application_state} and we have waited too long, quitting..."
exit 1
else
echo "Application $application_id is in state $application_state. We have been waiting for $time_diff seconds, looping ..."
echo "Application ${application_id} is in state ${application_state}. We have been waiting for ${time_diff} seconds, looping ..."
sleep 1
fi

application_state=$(docker exec master bash -c "yarn application -status $application_id" | grep "\sState" | sed 's/.*State : \(\w*\)/\1/')
application_state=$(docker exec master bash -c "yarn application -status ${application_id}" | grep "\sState" | sed 's/.*State : \(\w*\)/\1/')
done

final_application_state=$(docker exec master bash -c "yarn application -status $application_id" | grep "\sFinal-State" | sed 's/.*Final-State : \(\w*\)/\1/')
final_application_state=$(docker exec master bash -c "yarn application -status ${application_id}" | grep "\sFinal-State" | sed 's/.*Final-State : \(\w*\)/\1/')

echo "Final Application State: $final_application_state"
echo "Final Application State: ${final_application_state}"

if [[ $final_application_state != "SUCCEEDED" ]]; then
if [[ ${final_application_state} != "SUCCEEDED" ]]; then
echo "Running the Flink Application failed. 😞"
exit 1
fi

docker exec master bash -c "kdestroy"
docker_kdestroy master
}

function get_output {
docker exec master bash -c "kinit -kt /home/hadoop-user/hadoop-user.keytab hadoop-user"
docker exec master bash -c "hdfs dfs -ls $1"
OUTPUT=$(docker exec master bash -c "hdfs dfs -cat $1")
docker exec master bash -c "kdestroy"
echo "$OUTPUT"
echo "Getting output" >&2

docker_kinit master

docker exec master bash -c "hdfs dfs -ls -R $1"
local output
output=$(docker exec master bash -c "hdfs dfs -cat $1")

docker_kdestroy master

echo "${output}"
}

function get_yarn_application_id {
echo "Getting YARN application id" >&2

docker_kinit master

local application_id
application_id=$(docker exec master bash -c "yarn application -list -appStates ALL" | grep "Flink" | awk '{print $1}')
echo "YARN application ID: $application_id" >&2

docker_kdestroy master

echo "${application_id}"
}

function get_yarn_application_logs {
echo -e "Getting YARN application logs" >&2

local application_id
application_id=$(get_yarn_application_id)

local logs
docker_kinit master
logs=$(docker exec master bash -c "yarn logs -applicationId ${application_id}")
docker_kdestroy master

echo "${logs}"
}

function docker_kinit {
docker exec "$1" bash -c "kinit -kt /home/hadoop-user/hadoop-user.keytab hadoop-user"
}

function docker_kdestroy {
docker exec "$1" bash -c "kdestroy"
}
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ case $INPUT_TYPE in
INPUT_ARGS="--input dummy://localhost/words --input anotherDummy://localhost/words"
;;
(*)
echo "Unknown input type $INPUT_TYPE"
echo "Unknown input type ${INPUT_TYPE}"
exit 1
;;
esac
@@ -50,14 +50,14 @@ OUTPUT_PATH=hdfs:///user/hadoop-user/wc-out-$RANDOM
# it's important to run this with higher parallelism, otherwise we might risk that
# JM and TM are on the same YARN node and that we therefore don't test the keytab shipping
if docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run-application \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \
-t yarn-application \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.process.size=1000m \
-Djobmanager.memory.process.size=1000m \
-Dparallelism.default=3 \
-Dtaskmanager.memory.jvm-metaspace.size=128m \
/home/hadoop-user/$FLINK_DIRNAME/examples/streaming/WordCount.jar $INPUT_ARGS --output $OUTPUT_PATH";
/home/hadoop-user/${FLINK_DIRNAME}/examples/streaming/WordCount.jar ${INPUT_ARGS} --output ${OUTPUT_PATH}";
then
echo "Flink YARN Application submitted."
else
@@ -69,26 +69,36 @@ wait_for_single_yarn_application

# now we should have the application output ready

OUTPUT=$(get_output "$OUTPUT_PATH/*")
echo "$OUTPUT"
OUTPUT=$(get_output "${OUTPUT_PATH}/*/*")
echo "==== OUTPUT_BEGIN ===="
echo "${OUTPUT}"
echo "==== OUTPUT_END ===="

YARN_APPLICATION_LOGS=$(get_yarn_application_logs)
if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial delegation tokens from resource manager" ]]; then
echo "YARN logs does not contain delegation token usage message as required"
exit 1
fi

echo "Running Job without configured keytab, the exception you see below is expected"
docker exec master bash -c "echo \"\" > /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
# verify that it doesn't work if we don't configure a keytab
docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run-application \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \
-t yarn-application \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.process.size=1000m \
-Djobmanager.memory.process.size=1000m \
-Dparallelism.default=3 \
-Dtaskmanager.memory.jvm-metaspace.size=128m \
/home/hadoop-user/$FLINK_DIRNAME/examples/streaming/WordCount.jar --output $OUTPUT_PATH" > stderrAndstdoutFile 2>&1
OUTPUT=$(cat stderrAndstdoutFile)
/home/hadoop-user/${FLINK_DIRNAME}/examples/streaming/WordCount.jar --output ${OUTPUT_PATH}" > stderrAndstdoutFile 2>&1
STD_ERR_AND_STD_OUT=$(cat stderrAndstdoutFile)
rm stderrAndstdoutFile
echo "$OUTPUT"
echo "==== STD_ERR_AND_STD_OUT_BEGIN ===="
echo "${STD_ERR_AND_STD_OUT}"
echo "==== STD_ERR_AND_STD_OUT_END ===="

if [[ ! "$OUTPUT" =~ "Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials" ]]; then
if [[ ! "${STD_ERR_AND_STD_OUT}" =~ "Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials" ]]; then
echo "Output does not contain the Kerberos error message as required"
exit 1
fi
Loading

0 comments on commit 4a0f283

Please sign in to comment.