Skip to content

Commit

Permalink
[SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to merge namespaces between Spark Connect and PySpark with adding an CLI option `--remote` and `spark.remote` configuration as a symmetry of `--master` and `spark.master`.

### Why are the changes needed?

In order to provide the same user experience to the end users, see also the design document attached ([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)).

### Does this PR introduce _any_ user-facing change?

Yes, users now can use Spark Connect as below:

```
$ ./bin/pyspark --remote ...
$ ./bin/pyspark --conf spark.remote ...

...

>>> # **Same as regular PySpark from here**
... # Do something with `spark` that is a remote client
... spark.range(1)
```

```
$ ./bin/spark-submit --remote ... app.py
$ ./bin/spark-submit --conf spark.remote ... app.py

...

# **Same as regular PySpark from here**
# app.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Do something with `spark` that is a remote client
```

See the design document attached ([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)).

### How was this patch tested?

Reusing PySpark unittests of DataFrame and functions.

Closes apache#39041 from HyukjinKwon/prototype_merged_pyspark.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Dec 21, 2022
1 parent 2440f69 commit 764edaf
Show file tree
Hide file tree
Showing 22 changed files with 1,095 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object PythonRunner {
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
Expand Down
31 changes: 20 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ private[spark] class SparkSubmit extends Logging {
var childMainClass = ""

// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
val clusterManager: Int = args.maybeMaster match {
case Some(v) =>
assert(args.maybeRemote.isEmpty)
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
case None => LOCAL // default master or remote mode.
}

// Set the deploy mode; default is client mode
Expand All @@ -259,7 +264,7 @@ private[spark] class SparkSubmit extends Logging {
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
args.maybeMaster = Option(Utils.checkAndGetK8sMasterUrl(args.master))
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
Expand Down Expand Up @@ -597,7 +602,11 @@ private[spark] class SparkSubmit extends Logging {
val options = List[OptionAssigner](

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(
if (args.maybeRemote.isDefined) args.maybeMaster.orNull else args.master,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(
args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = SUBMIT_DEPLOY_MODE.key),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ import org.apache.spark.util.Utils
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var maybeMaster: Option[String] = None
// Global defaults. These should be keep to minimum to avoid confusing behavior.
def master: String = maybeMaster.getOrElse("local[*]")
var maybeRemote: Option[String] = None
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
Expand Down Expand Up @@ -149,10 +152,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
* Load arguments from environment variables, Spark properties etc.
*/
private def loadEnvironmentArguments(): Unit = {
master = Option(master)
maybeMaster = maybeMaster
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
maybeRemote = maybeRemote
.orElse(sparkProperties.get("spark.remote"))
.orElse(env.get("SPARK_REMOTE"))

driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get(config.DRIVER_CLASS_PATH.key))
.orNull
Expand Down Expand Up @@ -210,9 +216,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
dynamicAllocationEnabled =
sparkProperties.get(DYN_ALLOCATION_ENABLED.key).exists("true".equalsIgnoreCase)

// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")

// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
Expand Down Expand Up @@ -242,6 +245,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
if (args.length == 0) {
printUsageAndExit(-1)
}
if (maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) {
error("Remote cannot be specified with master and/or deploy mode.")
}
if (primaryResource == null) {
error("Must specify a primary resource (JAR or Python or R file)")
}
Expand Down Expand Up @@ -299,6 +305,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
override def toString: String = {
s"""Parsed arguments:
| master $master
| remote ${maybeRemote.orNull}
| deployMode $deployMode
| executorMemory $executorMemory
| executorCores $executorCores
Expand Down Expand Up @@ -338,7 +345,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
name = value

case MASTER =>
master = value
maybeMaster = Option(value)

case REMOTE =>
maybeRemote = Option(value)

case CLASS =>
mainClass = value
Expand Down Expand Up @@ -539,6 +549,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --verbose, -v Print additional debug output.
| --version, Print the version of current Spark.
|
| Experimental options:
| --remote CONNECT_URL URL to connect to the server for Spark Connect, e.g.,
| sc://host:port. --master and --deploy-mode cannot be set
| together with this option. This option is experimental, and
| might change between minor releases.
|
| Cluster deploy mode only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
| (Default: 1).
Expand Down
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ def __hash__(self):
"pyspark.sql.tests.connect.test_connect_basic",
"pyspark.sql.tests.connect.test_connect_function",
"pyspark.sql.tests.connect.test_connect_column",
"pyspark.sql.tests.connect.test_parity_functions",
"pyspark.sql.tests.connect.test_parity_dataframe",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ abstract class AbstractCommandBuilder {
String javaHome;
String mainClass;
String master;
String remote;
protected String propertiesFile;
final List<String> appArgs;
final List<String> jars;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ public T setMaster(String master) {
return self();
}

/**
* Set the Spark master for the application.
*
* @param remote Spark remote url.
* @return This launcher.
*/
public T setRemote(String remote) {
checkNotNull(remote, "remote");
builder.remote = remote;
return self();
}


/**
* Set the deploy mode for the application.
*
Expand Down Expand Up @@ -163,6 +176,8 @@ public T addSparkArg(String name, String value) {
SparkSubmitOptionParser validator = new ArgumentValidator(true);
if (validator.MASTER.equals(name)) {
setMaster(value);
} else if (validator.REMOTE.equals(name)) {
setRemote(value);
} else if (validator.PROPERTIES_FILE.equals(name)) {
setPropertiesFile(value);
} else if (validator.CONF.equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ List<String> buildSparkSubmitArgs() {
args.add(master);
}

if (remote != null) {
args.add(parser.REMOTE);
args.add(remote);
}

if (deployMode != null) {
args.add(parser.DEPLOY_MODE);
args.add(deployMode);
Expand Down Expand Up @@ -344,6 +349,7 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
// pass conf spark.pyspark.python to python by environment variable.
env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
}
env.put("SPARK_REMOTE", remote);
if (!isEmpty(pyOpts)) {
pyargs.addAll(parseOptionString(pyOpts));
}
Expand Down Expand Up @@ -457,9 +463,18 @@ private class OptionParser extends SparkSubmitOptionParser {
protected boolean handle(String opt, String value) {
switch (opt) {
case MASTER:
checkArgument(remote == null,
"Both master (%s) and remote (%s) cannot be set together.", master, remote);
master = value;
break;
case REMOTE:
checkArgument(remote == null,
"Both master (%s) and remote (%s) cannot be set together.", master, remote);
remote = value;
break;
case DEPLOY_MODE:
checkArgument(remote == null,
"Both deploy-mode (%s) and remote (%s) cannot be set together.", deployMode, remote);
deployMode = value;
break;
case PROPERTIES_FILE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class SparkSubmitOptionParser {
protected final String JARS = "--jars";
protected final String KILL_SUBMISSION = "--kill";
protected final String MASTER = "--master";
protected final String REMOTE = "--remote";
protected final String NAME = "--name";
protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
Expand Down Expand Up @@ -103,6 +104,7 @@ class SparkSubmitOptionParser {
{ KEYTAB },
{ KILL_SUBMISSION },
{ MASTER },
{ REMOTE },
{ NAME },
{ NUM_EXECUTORS },
{ PACKAGES },
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/spark_session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ See also :class:`SparkSession`.
SparkSession.builder.enableHiveSupport
SparkSession.builder.getOrCreate
SparkSession.builder.master
SparkSession.builder.remote
SparkSession.catalog
SparkSession.conf
SparkSession.createDataFrame
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ def __init__(
udf_profiler_cls: Type[UDFBasicProfiler] = UDFBasicProfiler,
memory_profiler_cls: Type[MemoryProfiler] = MemoryProfiler,
):
if "SPARK_REMOTE" in os.environ and "SPARK_TESTING" not in os.environ:
raise RuntimeError(
"Remote client cannot create a SparkContext. Create SparkSession instead."
)

if conf is None or conf.get("spark.executor.allowSparkContext", "false").lower() != "true":
# In order to prevent SparkContext from being created in executors.
Expand Down
65 changes: 45 additions & 20 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,50 @@
import platform
import warnings

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.utils import is_remote

if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
if is_remote():
try:
# Creates pyspark.sql.connect.SparkSession.
spark = SparkSession.builder.getOrCreate()
except Exception:
import sys
import traceback

SparkContext._ensure_initialized()
warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
version = pyspark.__version__
sc = None
else:
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

try:
spark = SparkSession._create_shell_session()
except Exception:
import sys
import traceback
SparkContext._ensure_initialized()

warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
try:
spark = SparkSession._create_shell_session()
except Exception:
import sys
import traceback

sc = spark.sparkContext
sql = spark.sql
atexit.register((lambda sc: lambda: sc.stop())(sc))
warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)

sc = spark.sparkContext
atexit.register((lambda sc: lambda: sc.stop())(sc))

# for compatibility
sqlContext = SQLContext._get_or_create(sc)
sqlCtx = sqlContext
# for compatibility
sqlContext = SQLContext._get_or_create(sc)
sqlCtx = sqlContext
version = sc.version

sql = spark.sql

print(
r"""Welcome to
Expand All @@ -61,14 +79,21 @@
/__ / .__/\_,_/_/ /_/\_\ version %s
/_/
"""
% sc.version
% version
)
print(
"Using Python version %s (%s, %s)"
% (platform.python_version(), platform.python_build()[0], platform.python_build()[1])
)
print("Spark context Web UI available at %s" % (sc.uiWebUrl))
print("Spark context available as 'sc' (master = %s, app id = %s)." % (sc.master, sc.applicationId))
if is_remote():
print("Client connected to the Spark Connect server at %s" % (os.environ["SPARK_REMOTE"]))
else:
print("Spark context Web UI available at %s" % (sc.uiWebUrl)) # type: ignore[union-attr]
print(
"Spark context available as 'sc' (master = %s, app id = %s)."
% (sc.master, sc.applicationId) # type: ignore[union-attr]
)

print("SparkSession available as 'spark'.")

# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ def appName(self, name: str) -> "SparkSession.Builder":
return self.config("spark.app.name", name)

def remote(self, location: str = "sc://localhost") -> "SparkSession.Builder":
return self.config("spark.connect.location", location)
return self.config("spark.remote", location)

def enableHiveSupport(self) -> "SparkSession.Builder":
raise NotImplementedError("enableHiveSupport not implemented for Spark Connect")

def getOrCreate(self) -> "SparkSession":
"""Creates a new instance."""
return SparkSession(connectionString=self._options["spark.connect.location"])
return SparkSession(connectionString=self._options["spark.remote"])

_client: SparkConnectClient

Expand Down
Loading

0 comments on commit 764edaf

Please sign in to comment.