From c90d99fe2dd2a78214c1f16f6052ee17e30afd84 Mon Sep 17 00:00:00 2001 From: Luning Wang Date: Mon, 4 Jul 2022 10:05:47 +0800 Subject: [PATCH] [FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-all gateway --- flink-dist/src/main/assemblies/bin.xml | 7 ++ flink-dist/src/main/assemblies/opt.xml | 6 ++ flink-dist/src/main/flink-bin/bin/config.sh | 19 +++++ .../src/main/flink-bin/bin/flink-console.sh | 9 +- .../src/main/flink-bin/bin/flink-daemon.sh | 9 +- .../tests/util/flink/FlinkDistribution.java | 12 +++ .../flink/LocalStandaloneFlinkResource.java | 5 ++ .../flink-sql-gateway-test/pom.xml | 84 +++++++++++++++++++ .../flink/table/gateway/SQLGatewayITCase.java | 71 ++++++++++++++++ flink-end-to-end-tests/pom.xml | 3 +- .../flink-sql-gateway/bin/sql-gateway.sh | 84 +++++++++++++++++++ 11 files changed, 304 insertions(+), 5 deletions(-) create mode 100644 flink-end-to-end-tests/flink-sql-gateway-test/pom.xml create mode 100644 flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java create mode 100644 flink-table/flink-sql-gateway/bin/sql-gateway.sh diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index dcbac513857ff..52172fe50a74d 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -172,6 +172,13 @@ under the License. 0755 + + + ../flink-table/flink-sql-gateway/bin/ + bin + 0755 + + src/main/flink-bin/yarn-bin diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index c56ac25938055..cdb57563ecc2e 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -65,6 +65,12 @@ flink-sql-client-${project.version}.jar 0644 + + ../flink-table/flink-sql-gateway/target/flink-sql-gateway-${project.version}.jar + opt/ + flink-sql-gateway-${project.version}.jar + 0644 + diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index e8b1e2a818aa7..f136f963c9b6f 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -66,6 +66,25 @@ findFlinkDistJar() { echo "$FLINK_DIST" } +findSqlGatewayJar() { + local SQL_GATEWAY + SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')" + local SQL_GATEWAY_COUNT + SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)" + + # If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored + # as the classpath and exit function with empty classpath to force process failure + if [[ "$SQL_GATEWAY" == "" ]]; then + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_OPT_DIR.") + exit 1 + elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then + (>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.") + exit 1 + fi + + echo "$SQL_GATEWAY" +} + # These are used to mangle paths that are passed to java when using # cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere # but the windows java version expects them in Windows Format, i.e. C:\bla\blub. diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index 2c84e60cc193b..05b9d42632eaf 100755 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -19,7 +19,7 @@ # Start a Flink service as a console application. Must be stopped with Ctrl-C # or with SIGTERM by kill or the controlling process. -USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]" +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]" SERVICE=$1 ARGS=("${@:2}") # get remaining arguments as array @@ -62,6 +62,11 @@ case $SERVICE in CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner ;; + (sqlgateway) + CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway + SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar` + ;; + (*) echo "Unknown service '${SERVICE}'. $USAGE." exit 1 @@ -111,4 +116,4 @@ echo $$ >> "$pid" 2>/dev/null # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) -exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" +exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index c6d54420b1d70..246b540a9721d 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -18,7 +18,7 @@ ################################################################################ # Start/stop a Flink daemon. -USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]" STARTSTOP=$1 DAEMON=$2 @@ -50,6 +50,11 @@ case $DAEMON in CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint ;; + (sqlgateway) + CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway + SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar` + ;; + (*) echo "Unknown daemon '${DAEMON}'. $USAGE." exit 1 @@ -131,7 +136,7 @@ case $STARTSTOP in FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) echo "Starting $DAEMON daemon on host $HOSTNAME." - "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & + "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & mypid=$! diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 93c34a2299355..c699adaeb8fc9 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -106,6 +106,18 @@ public void startTaskManager() throws IOException { bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start"); } + public void startSQLGateway(String arg) throws IOException { + LOG.info("Starting Flink SQL Gateway."); + AutoClosableProcess.runBlocking( + bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg); + } + + public void stopSQLGateway() throws IOException { + LOG.info("Stopping Flink SQL Gateway."); + AutoClosableProcess.runBlocking( + bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop"); + } + public void setRootLogLevel(Level logLevel) throws IOException { FileUtils.replace( conf.resolve("log4j.properties"), diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java index a7fc0f22555b5..dc767d382e0af 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java @@ -113,6 +113,7 @@ public void afterTestFailure() { private void shutdownCluster() { try { distribution.stopFlinkCluster(); + distribution.stopSQLGateway(); } catch (IOException e) { LOG.warn("Error while shutting down Flink cluster.", e); } @@ -183,6 +184,10 @@ public ClusterController startCluster(int numTaskManagers) throws IOException { throw new RuntimeException("Cluster did not start in expected time-frame."); } + public void startSQLGateway(String arg) throws IOException { + distribution.startSQLGateway(arg); + } + @Override public Stream searchAllLogs(Pattern pattern, Function matchProcessor) throws IOException { diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml new file mode 100644 index 0000000000000..3b8cc597bb2d0 --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/pom.xml @@ -0,0 +1,84 @@ + + + + flink-end-to-end-tests + org.apache.flink + 1.16-SNAPSHOT + + 4.0.0 + + flink-sql-gateway-test + Flink : E2E Tests : SQL Gateway + jar + + + + org.apache.flink + flink-end-to-end-tests-common + ${project.version} + + + org.apache.flink + flink-sql-connector-hive-${hive.version}_${scala.binary.version} + ${project.version} + + + org.apache.hive + hive-jdbc + ${hive.version} + + + org.apache.flink + flink-test-utils-junit + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + pre-integration-test + + copy + + + + + + + org.apache.flink + flink-sql-connector-hive-2.3.9_${scala.binary.version} + ${project.version} + flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + + + + + diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java new file mode 100644 index 0000000000000..5b037b1ccae7d --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SQLGatewayITCase.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway; + +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.ClusterController; +import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceSetup; +import org.apache.flink.tests.util.flink.JarLocation; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; + +import java.nio.file.Path; +import java.sql.DriverManager; +import java.sql.SQLException; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SQLGatewayITCase extends TestLogger { + + private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl"; + private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; + + private static final Path HIVE_SQL_CONNECOTR_JAR = + TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar"); + + @Rule + public final FlinkResource flink = + new LocalStandaloneFlinkResourceFactory() + .create( + FlinkResourceSetup.builder() + .addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB) + .build()); + + @Test + public void testGateway() throws Exception { + try (ClusterController clusterController = flink.startCluster(1)) { + ((LocalStandaloneFlinkResource) flink) + .startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2"); + Thread.sleep(2000); + Class.forName(DRIVER_NAME); + try { + DriverManager.getConnection(JDBC_URL); + } catch (SQLException e) { + assertThat(e.getMessage()) + .contains( + "Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris"); + } + } + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 5fbcaf181f369..31bd7facf9a1e 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -83,7 +83,8 @@ under the License. flink-end-to-end-tests-elasticsearch7 flink-end-to-end-tests-common-elasticsearch flink-end-to-end-tests-sql - + flink-sql-gateway-test + diff --git a/flink-table/flink-sql-gateway/bin/sql-gateway.sh b/flink-table/flink-sql-gateway/bin/sql-gateway.sh new file mode 100644 index 0000000000000..f925a25adcf38 --- /dev/null +++ b/flink-table/flink-sql-gateway/bin/sql-gateway.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Start/stop a Flink SQL Gateway. + +function usage() { + echo "Usage: bin/sql-gateway.sh command" + echo " commands:" + echo " start - Run a SQL Gateway as a daemon" + echo " start-foreground - Run a SQL Gateway as a console application" + echo " stop - Stop the SQL Gateway daemon" + echo " stop-all - Stop all the SQL Gateway daemons" + echo " -h | --help - Show this help message" +} + +if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then + usage + exit 0 +fi + +STARTSTOP=$1 + +if [ -z "$STARTSTOP" ]; then + STARTSTOP="start" +fi + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + usage + exit 1 +fi + +################################################################################ +# Adopted from "flink" bash script +################################################################################ + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +ENTRYPOINT=sqlgateway + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${@:2}" +fi