Skip to content

Commit

Permalink
[SPARK-49282][CONNECT][SQL] Create a shared SparkSessionBuilder inter…
Browse files Browse the repository at this point in the history
…face

### What changes were proposed in this pull request?
This PR adds a shared SparkSessionBuilder interface.

It also adds a SparkSessionCompanion interface which is mean should be implemented by all SparkSession companions (a.k.a. `object SparkSession`. This is currently the entry point for session building, in the future we will also add the management of active/default sessions.

Finally we add a companion for api.SparkSession. This will bind the implementation that is currently located in `org.apache.spark.sql`. This makes it possible to exclusively work with the interface, instead of selecting an implementation upfront.

### Why are the changes needed?
We are creating a shared Scala SQL interface. Building a session is part of this interface.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests. I have added tests for the implementation binding.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48229 from hvanhovell/SPARK-49282.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
hvanhovell committed Sep 25, 2024
1 parent 29ed272 commit 5fb0ff9
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 215 deletions.
7 changes: 7 additions & 0 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-api_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-common-utils_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class SparkSession private[sql] (

// The minimal builder needed to create a spark session.
// TODO: implements all methods mentioned in the scaladoc of [[SparkSession]]
object SparkSession extends Logging {
object SparkSession extends api.SparkSessionCompanion with Logging {
private val MAX_CACHED_SESSIONS = 100
private val planIdGenerator = new AtomicLong
private var server: Option[Process] = None
Expand Down Expand Up @@ -618,15 +618,15 @@ object SparkSession extends Logging {
*/
def builder(): Builder = new Builder()

class Builder() extends Logging {
class Builder() extends api.SparkSessionBuilder {
// Initialize the connection string of the Spark Connect client builder from SPARK_REMOTE
// by default, if it exists. The connection string can be overridden using
// the remote() function, as it takes precedence over the SPARK_REMOTE environment variable.
private val builder = SparkConnectClient.builder().loadFromEnvironment()
private var client: SparkConnectClient = _
private[this] val options = new scala.collection.mutable.HashMap[String, String]

def remote(connectionString: String): Builder = {
/** @inheritdoc */
def remote(connectionString: String): this.type = {
builder.connectionString(connectionString)
this
}
Expand All @@ -638,93 +638,45 @@ object SparkSession extends Logging {
*
* @since 3.5.0
*/
def interceptor(interceptor: ClientInterceptor): Builder = {
def interceptor(interceptor: ClientInterceptor): this.type = {
builder.interceptor(interceptor)
this
}

private[sql] def client(client: SparkConnectClient): Builder = {
private[sql] def client(client: SparkConnectClient): this.type = {
this.client = client
this
}

/**
* Sets a config option. Options set using this method are automatically propagated to the
* Spark Connect session. Only runtime options are supported.
*
* @since 3.5.0
*/
def config(key: String, value: String): Builder = synchronized {
options += key -> value
this
}
/** @inheritdoc */
override def config(key: String, value: String): this.type = super.config(key, value)

/**
* Sets a config option. Options set using this method are automatically propagated to the
* Spark Connect session. Only runtime options are supported.
*
* @since 3.5.0
*/
def config(key: String, value: Long): Builder = synchronized {
options += key -> value.toString
this
}
/** @inheritdoc */
override def config(key: String, value: Long): this.type = super.config(key, value)

/**
* Sets a config option. Options set using this method are automatically propagated to the
* Spark Connect session. Only runtime options are supported.
*
* @since 3.5.0
*/
def config(key: String, value: Double): Builder = synchronized {
options += key -> value.toString
this
}
/** @inheritdoc */
override def config(key: String, value: Double): this.type = super.config(key, value)

/**
* Sets a config option. Options set using this method are automatically propagated to the
* Spark Connect session. Only runtime options are supported.
*
* @since 3.5.0
*/
def config(key: String, value: Boolean): Builder = synchronized {
options += key -> value.toString
this
}
/** @inheritdoc */
override def config(key: String, value: Boolean): this.type = super.config(key, value)

/**
* Sets a config a map of options. Options set using this method are automatically propagated
* to the Spark Connect session. Only runtime options are supported.
*
* @since 3.5.0
*/
def config(map: Map[String, Any]): Builder = synchronized {
map.foreach { kv: (String, Any) =>
{
options += kv._1 -> kv._2.toString
}
}
this
}
/** @inheritdoc */
override def config(map: Map[String, Any]): this.type = super.config(map)

/**
* Sets a config option. Options set using this method are automatically propagated to both
* `SparkConf` and SparkSession's own configuration.
*
* @since 3.5.0
*/
def config(map: java.util.Map[String, Any]): Builder = synchronized {
config(map.asScala.toMap)
}
/** @inheritdoc */
override def config(map: java.util.Map[String, Any]): this.type = super.config(map)

/** @inheritdoc */
@deprecated("enableHiveSupport does not work in Spark Connect")
def enableHiveSupport(): Builder = this
override def enableHiveSupport(): this.type = this

/** @inheritdoc */
@deprecated("master does not work in Spark Connect, please use remote instead")
def master(master: String): Builder = this
override def master(master: String): this.type = this

/** @inheritdoc */
@deprecated("appName does not work in Spark Connect")
def appName(name: String): Builder = this
override def appName(name: String): this.type = this

private def tryCreateSessionFromClient(): Option[SparkSession] = {
if (client != null && client.isSessionValid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.api.SparkSessionBuilder
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession}

/**
* Make sure the api.SparkSessionBuilder binds to Connect implementation.
*/
class SparkSessionBuilderImplementationBindingSuite
extends ConnectFunSuite
with api.SparkSessionBuilderImplementationBindingSuite
with RemoteSparkSession {
override protected def configure(builder: SparkSessionBuilder): builder.type = {
// We need to set this configuration because the port used by the server is random.
builder.remote(s"sc://localhost:$serverPort")
}
}
46 changes: 25 additions & 21 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,26 +125,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"),

// SPARK-49414: Remove Logging from DataFrameReader.
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.DataFrameReader"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logName"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.log"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.isTraceEnabled"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary$default$2"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeForcefully"),

// SPARK-49425: Create a shared DataFrameWriter interface.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriter"),

Expand Down Expand Up @@ -195,7 +175,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLImplicits$StringToColumn"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$implicits$"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.SQLImplicits.session"),
)

// SPARK-49282: Shared SparkSessionBuilder
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$Builder"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")

// Default exclude rules
lazy val defaultExcludes = Seq(
Expand Down Expand Up @@ -236,6 +220,26 @@ object MimaExcludes {
}
)

private def loggingExcludes(fqn: String) = {
Seq(
ProblemFilters.exclude[MissingTypesProblem](fqn),
missingMethod(fqn, "logName"),
missingMethod(fqn, "log"),
missingMethod(fqn, "logInfo"),
missingMethod(fqn, "logDebug"),
missingMethod(fqn, "logTrace"),
missingMethod(fqn, "logWarning"),
missingMethod(fqn, "logError"),
missingMethod(fqn, "isTraceEnabled"),
missingMethod(fqn, "initializeLogIfNecessary"),
missingMethod(fqn, "initializeLogIfNecessary$default$2"),
missingMethod(fqn, "initializeForcefully"))
}

private def missingMethod(names: String*) = {
ProblemFilters.exclude[DirectMissingMethodProblem](names.mkString("."))
}

def excludes(version: String): Seq[Problem => Boolean] = version match {
case v if v.startsWith("4.0") => v40excludes
case _ => Seq()
Expand Down
Loading

0 comments on commit 5fb0ff9

Please sign in to comment.