Skip to content

Commit

Permalink
[FLINK-27770][sql-gateway] Introduce the script to start/stop/stop-al…
Browse files Browse the repository at this point in the history
…l gateway
  • Loading branch information
a49a authored and fsk119 committed Aug 3, 2022
1 parent 69df7a4 commit c90d99f
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 5 deletions.
7 changes: 7 additions & 0 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ under the License.
<fileMode>0755</fileMode>
</fileSet>

<!-- copy SQL gateway -->
<fileSet>
<directory>../flink-table/flink-sql-gateway/bin/</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>

<!-- copy yarn start scripts -->
<fileSet>
<directory>src/main/flink-bin/yarn-bin</directory>
Expand Down
6 changes: 6 additions & 0 deletions flink-dist/src/main/assemblies/opt.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<destName>flink-sql-client-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>
<file>
<source>../flink-table/flink-sql-gateway/target/flink-sql-gateway-${project.version}.jar</source>
<outputDirectory>opt/</outputDirectory>
<destName>flink-sql-gateway-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

<!-- State Processor API -->
<file>
Expand Down
19 changes: 19 additions & 0 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ findFlinkDistJar() {
echo "$FLINK_DIST"
}

findSqlGatewayJar() {
local SQL_GATEWAY
SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')"
local SQL_GATEWAY_COUNT
SQL_GATEWAY_COUNT="$(echo "$SQL_GATEWAY" | wc -l)"

# If flink-sql-gateway*.jar cannot be resolved write error messages to stderr since stdout is stored
# as the classpath and exit function with empty classpath to force process failure
if [[ "$SQL_GATEWAY" == "" ]]; then
(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_OPT_DIR.")
exit 1
elif [[ "$SQL_GATEWAY_COUNT" -gt 1 ]]; then
(>&2 echo "[ERROR] Multiple flink-sql-gateway*.jar found in $FLINK_OPT_DIR. Please resolve.")
exit 1
fi

echo "$SQL_GATEWAY"
}

# These are used to mangle paths that are passed to java when using
# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
Expand Down
9 changes: 7 additions & 2 deletions flink-dist/src/main/flink-bin/bin/flink-console.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager|sqlgateway) [args]"

SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
Expand Down Expand Up @@ -62,6 +62,11 @@ case $SERVICE in
CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;;

(sqlgateway)
CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
;;

(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
Expand Down Expand Up @@ -111,4 +116,4 @@ echo $$ >> "$pid" 2>/dev/null
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
9 changes: 7 additions & 2 deletions flink-dist/src/main/flink-bin/bin/flink-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
################################################################################

# Start/stop a Flink daemon.
USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|sqlgateway) [args]"

STARTSTOP=$1
DAEMON=$2
Expand Down Expand Up @@ -50,6 +50,11 @@ case $DAEMON in
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;

(sqlgateway)
CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
SQL_GATEWAY_CLASSPATH=`findSqlGatewayJar`
;;

(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
Expand Down Expand Up @@ -131,7 +136,7 @@ case $STARTSTOP in
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

echo "Starting $DAEMON daemon on host $HOSTNAME."
"$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
"$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$SQL_GATEWAY_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &

mypid=$!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ public void startTaskManager() throws IOException {
bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
}

public void startSQLGateway(String arg) throws IOException {
LOG.info("Starting Flink SQL Gateway.");
AutoClosableProcess.runBlocking(
bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "start", arg);
}

public void stopSQLGateway() throws IOException {
LOG.info("Stopping Flink SQL Gateway.");
AutoClosableProcess.runBlocking(
bin.resolve("sql-gateway.sh").toAbsolutePath().toString(), "stop");
}

public void setRootLogLevel(Level logLevel) throws IOException {
FileUtils.replace(
conf.resolve("log4j.properties"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void afterTestFailure() {
private void shutdownCluster() {
try {
distribution.stopFlinkCluster();
distribution.stopSQLGateway();
} catch (IOException e) {
LOG.warn("Error while shutting down Flink cluster.", e);
}
Expand Down Expand Up @@ -183,6 +184,10 @@ public ClusterController startCluster(int numTaskManagers) throws IOException {
throw new RuntimeException("Cluster did not start in expected time-frame.");
}

public void startSQLGateway(String arg) throws IOException {
distribution.startSQLGateway(arg);
}

@Override
public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
throws IOException {
Expand Down
84 changes: 84 additions & 0 deletions flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.16-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-sql-gateway-test</artifactId>
<name>Flink : E2E Tests : SQL Gateway</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-end-to-end-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>pre-integration-test</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.9_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<destFileName>flink-sql-connector-hive-2.3.9_${scala.binary.version}-${project.version}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway;

import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.JarLocation;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;

import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;

import static org.assertj.core.api.Assertions.assertThat;

public class SQLGatewayITCase extends TestLogger {

private static String JDBC_URL = "jdbc:hive2://localhost:8084/default;auth=noSasl";
private static String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";

private static final Path HIVE_SQL_CONNECOTR_JAR =
TestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");

@Rule
public final FlinkResource flink =
new LocalStandaloneFlinkResourceFactory()
.create(
FlinkResourceSetup.builder()
.addJar(HIVE_SQL_CONNECOTR_JAR, JarLocation.LIB)
.build());

@Test
public void testGateway() throws Exception {
try (ClusterController clusterController = flink.startCluster(1)) {
((LocalStandaloneFlinkResource) flink)
.startSQLGateway("-Dsql-gateway.endpoint.type=hiveserver2");
Thread.sleep(2000);
Class.forName(DRIVER_NAME);
try {
DriverManager.getConnection(JDBC_URL);
} catch (SQLException e) {
assertThat(e.getMessage())
.contains(
"Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris");
}
}
}
}
3 changes: 2 additions & 1 deletion flink-end-to-end-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ under the License.
<module>flink-end-to-end-tests-elasticsearch7</module>
<module>flink-end-to-end-tests-common-elasticsearch</module>
<module>flink-end-to-end-tests-sql</module>
</modules>
<module>flink-sql-gateway-test</module>
</modules>

<dependencyManagement>
<dependencies>
Expand Down
84 changes: 84 additions & 0 deletions flink-table/flink-sql-gateway/bin/sql-gateway.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Start/stop a Flink SQL Gateway.

function usage() {
echo "Usage: bin/sql-gateway.sh command"
echo " commands:"
echo " start - Run a SQL Gateway as a daemon"
echo " start-foreground - Run a SQL Gateway as a console application"
echo " stop - Stop the SQL Gateway daemon"
echo " stop-all - Stop all the SQL Gateway daemons"
echo " -h | --help - Show this help message"
}

if [[ "$*" = *--help ]] || [[ "$*" = *-h ]]; then
usage
exit 0
fi

STARTSTOP=$1

if [ -z "$STARTSTOP" ]; then
STARTSTOP="start"
fi

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
usage
exit 1
fi

################################################################################
# Adopted from "flink" bash script
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
if [ "$iteration" -gt 100 ]; then
echo "Cannot resolve path: You have a cyclic symlink in $target."
break
fi
ls=`ls -ld -- "$target"`
target=`expr "$ls" : '.* -> \(.*\)$'`
iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi

ENTRYPOINT=sqlgateway

if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${@:2}"
else
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${@:2}"
fi

0 comments on commit c90d99f

Please sign in to comment.