Skip to content

Commit

Permalink
[SPARK-42371][CONNECT] Add scripts to start and stop Spark Connect se…
Browse files Browse the repository at this point in the history
…rver

### What changes were proposed in this pull request?

This PR proposes to scripts to start and stop the Spark Connect server.

### Why are the changes needed?

Currently, there is no proper way to start and stop the Spark Connect server. Now it requires you to start it with, for example, a Spark shell:

```bash
# For development,
./bin/spark-shell \
   --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` \
  --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin
```

```bash
# For released Spark versions
./bin/spark-shell \
  --packages org.apache.spark:spark-connect_2.12:3.4.0 \
  --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin
```

which is awkward.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new scripts to start and stop Spark Connect server.

### How was this patch tested?

Manually tested:

```bash
# For released Spark versions,
#`sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0`
sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`
```

```bash
bin/pyspark --remote sc://localhost:15002
...
```

```bash
sbin/stop-connect-server.sh
ps -fe | grep Spark
```

Closes apache#39928 from HyukjinKwon/exec-script.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Feb 8, 2023
1 parent 839c56a commit 1126031
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.service

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/**
* The Spark Connect server
*/
object SparkConnectServer extends Logging {
def main(args: Array[String]): Unit = {
// Set the active Spark Session, and starts SparkEnv instance (via Spark Context)
logInfo("Starting Spark session.")
val session = SparkSession.builder.getOrCreate()
try {
try {
SparkConnectService.start()
logInfo("Spark Connect server started.")
} catch {
case e: Exception =>
logError("Error starting Spark Connect server", e)
System.exit(-1)
}
SparkConnectService.server.awaitTermination()
} finally {
session.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object SparkConnectService {
// different or complex type easily.
private type SessionCacheKey = (String, String)

private var server: Server = _
private[connect] var server: Server = _

// For testing purpose, it's package level private.
private[connect] lazy val localPort = {
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ private[spark] class SparkSubmit extends Logging {
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case (_, CLUSTER) if isConnectServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Connect server.")
case _ =>
}

Expand Down Expand Up @@ -972,6 +974,10 @@ private[spark] class SparkSubmit extends Logging {
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
} else if (childMainClass.contains("org.apache.spark.sql.connect")) {
logInfo(s"Failed to load main class $childMainClass.")
// TODO(SPARK-42375): Should point out the user-facing page here instead.
logInfo("You need to specify Spark Connect jars with --jars or --packages.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
Expand Down Expand Up @@ -1006,7 +1012,8 @@ private[spark] class SparkSubmit extends Logging {
throw findCause(t)
} finally {
if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) {
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
!isConnectServer(args.mainClass)) {
try {
SparkContext.getActive.foreach(_.stop())
} catch {
Expand Down Expand Up @@ -1130,6 +1137,13 @@ object SparkSubmit extends CommandLineUtils with Logging {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}

/**
* Return whether the given main class represents a connect server.
*/
private def isConnectServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.connect.service.SparkConnectServer"
}

/**
* Return whether the given primary resource requires running python.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
SparkLauncher.NO_RESOURCE);
specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
SparkLauncher.NO_RESOURCE);
specialClasses.put("org.apache.spark.sql.connect.service.SparkConnectServer",
SparkLauncher.NO_RESOURCE);
}

final List<String> userArgs;
Expand Down Expand Up @@ -267,8 +269,8 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
if (isThriftServer(mainClass)) {
// Take Thrift/Connect Server as daemon
if (isThriftServer(mainClass) || isConnectServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
Expand All @@ -288,9 +290,10 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
// - SPARK_DRIVER_MEMORY env variable
// - SPARK_MEM env variable
// - default value (1g)
// Take Thrift Server as daemon
// Take Thrift/Connect Server as daemon
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
isThriftServer(mainClass) || isConnectServer(mainClass) ?
System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
Expand Down Expand Up @@ -423,6 +426,14 @@ private boolean isThriftServer(String mainClass) {
mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
}

/**
* Return whether the given main class represents a connect server.
*/
private boolean isConnectServer(String mainClass) {
return (mainClass != null &&
mainClass.equals("org.apache.spark.sql.connect.service.SparkConnectServer"));
}

private String findExamplesAppJar() {
boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
if (isTesting) {
Expand Down
41 changes: 41 additions & 0 deletions sbin/start-connect-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Enter posix mode for bash
set -o posix

# Shell script for starting the Spark Connect server
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.sql.connect.service.SparkConnectServer"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-connect-server.sh [options]"

"${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 1
fi

. "${SPARK_HOME}/bin/load-spark-env.sh"

exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "Spark Connect server" "$@"
26 changes: 26 additions & 0 deletions sbin/stop-connect-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Stops the connect server on the machine this script is executed on.

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.sql.connect.service.SparkConnectServer 1

0 comments on commit 1126031

Please sign in to comment.