Skip to content

Commit

Permalink
[SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies securi…
Browse files Browse the repository at this point in the history
…ty context

Augments the JdbcConnectionProvider API such that a provider can indicate that it will need to modify the global security configuration when establishing a connection, and as such, if access to the global security configuration should be synchronized to prevent races.

### What changes were proposed in this pull request?
As suggested by gaborgsomogyi [here](https://github.com/apache/spark/pull/29024/files#r755788709), augments the `JdbcConnectionProvider` API to include a `modifiesSecurityContext` method that can be used by `ConnectionProvider` to determine when `SecurityConfigurationLock.synchronized` is required to avoid race conditions when establishing a JDBC connection.

### Why are the changes needed?
Provides a path forward for working around a significant bottleneck introduced by synchronizing `SecurityConfigurationLock` every time a connection is established. The synchronization isn't always needed and it should be at the discretion of the `JdbcConnectionProvider` to determine when locking is necessary. See [SPARK-37391](https://issues.apache.org/jira/browse/SPARK-37391) or [this thread](https://github.com/apache/spark/pull/29024/files#r754441783).

### Does this PR introduce _any_ user-facing change?
Any existing implementations of `JdbcConnectionProvider` will need to add a definition of `modifiesSecurityContext`. I'm also open to adding a default implementation, but it seemed to me that requiring an explicit implementation of the method was preferable.

A drop-in implementation that would continue the existing behavior is:
```scala
override def modifiesSecurityContext(
  driver: Driver,
  options: Map[String, String]
): Boolean = true
```

### How was this patch tested?
Unit tests, but I also plan to run a real workflow once I get the initial thumbs up on this implementation.

Closes apache#34745 from tdg5/SPARK-37391-opt-in-security-configuration-sync.

Authored-by: Danny Guinther <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
  • Loading branch information
Danny Guinther authored and sarutak committed Dec 24, 2021
1 parent 656127d commit 6cc4c90
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ class ExampleJdbcConnectionProvider extends JdbcConnectionProvider with Logging
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = false

override def getConnection(driver: Driver, options: Map[String, String]): Connection = null

override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ object MimaExcludes {
// The followings are necessary for Scala 2.13.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$"),

// [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security context
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcConnectionProvider.modifiesSecurityContext")
)

// Exclude rules for 3.2.x from 3.1.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,12 @@ private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with
logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and properties: $properties")
driver.connect(jdbcOptions.url, properties)
}

override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = {
// BasicConnectionProvider is the default unsecure connection provider, so just return false
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,20 @@ protected abstract class ConnectionProviderBase extends Logging {
filteredProviders.head
}

SecurityConfigurationLock.synchronized {
// Inside getConnection it's safe to get parent again because SecurityConfigurationLock
// makes sure it's untouched
val parent = Configuration.getConfiguration
try {
selectedProvider.getConnection(driver, options)
} finally {
logDebug("Restoring original security configuration")
Configuration.setConfiguration(parent)
if (selectedProvider.modifiesSecurityContext(driver, options)) {
SecurityConfigurationLock.synchronized {
// Inside getConnection it's safe to get parent again because SecurityConfigurationLock
// makes sure it's untouched
val parent = Configuration.getConfiguration
try {
selectedProvider.getConnection(driver, options)
} finally {
logDebug("Restoring original security configuration")
Configuration.setConfiguration(parent)
}
}
} else {
selectedProvider.getConnection(driver, options)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,27 @@ abstract class JdbcConnectionProvider {
def canHandle(driver: Driver, options: Map[String, String]): Boolean

/**
* Opens connection toward the database. Since global JVM security configuration change may needed
* this API is called synchronized by `SecurityConfigurationLock` to avoid race.
* Opens connection to the database. Since global JVM security configuration change may be
* needed this API is called synchronized by `SecurityConfigurationLock` to avoid race when
* `modifiesSecurityContext` returns true for the given driver with the given options.
*
* @param driver Java driver which initiates the connection
* @param options Driver options which initiates the connection
* @return a `Connection` object that represents a connection to the URL
*/
def getConnection(driver: Driver, options: Map[String, String]): Connection

/**
* Checks if this connection provider instance needs to modify global security configuration to
* handle authentication and thus should synchronize access to the security configuration while
* the given driver is initiating a connection with the given options.
*
* @param driver Java driver which initiates the connection
* @param options Driver options which initiates the connection
* @return True if the connection provider will need to modify the security configuration when
* initiating a connection with the given driver with the given options.
*
* @since 3.1.3
*/
def modifiesSecurityContext(driver: Driver, options: Map[String, String]): Boolean
}
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ Implementation considerations:
* CPs are running in heavy multi-threaded environment and adding a state into a CP is not advised.
If any state added then it must be synchronized properly. It could cause quite some headache to
hunt down such issues.
* Some of the CPs are modifying the JVM global security context so `getConnection` method is
synchronized by `org.apache.spark.security.SecurityConfigurationLock` to avoid race.
* Some of the CPs are modifying the JVM global security context so if the CP's
`modifiesSecurityContext` method returns `true` then the CP's `getConnection` method will
be called synchronized by `org.apache.spark.security.SecurityConfigurationLock` to avoid race.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.security.auth.login.Configuration
import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark.SparkConf
import org.apache.spark.security.SecurityConfigurationLock
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.jdbc.JdbcConnectionProvider
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -68,12 +69,20 @@ class ConnectionProviderSuite
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection =
throw new RuntimeException()
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}
val provider2 = new JdbcConnectionProvider() {
override val name: String = "test2"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection =
throw new RuntimeException()
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}

val providerBase = new ConnectionProviderBase() {
Expand All @@ -92,12 +101,20 @@ class ConnectionProviderSuite
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection =
throw new RuntimeException()
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}
val provider2 = new JdbcConnectionProvider() {
override val name: String = "test2"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection =
mock[Connection]
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}

val providerBase = new ConnectionProviderBase() {
Expand All @@ -107,12 +124,50 @@ class ConnectionProviderSuite
assert(providerBase.create(mock[Driver], Map.empty, Some("test2")).isInstanceOf[Connection])
}

test("Synchronize on SecurityConfigurationLock when the specified connection provider needs") {
val provider1 = new JdbcConnectionProvider() {
override val name: String = "test1"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
assert(Thread.holdsLock(SecurityConfigurationLock))
mock[Connection]
}
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = true
}
val provider2 = new JdbcConnectionProvider() {
override val name: String = "test2"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
assert(!Thread.holdsLock(SecurityConfigurationLock))
mock[Connection]
}
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}

val providerBase = new ConnectionProviderBase() {
override val providers = Seq(provider1, provider2)
}
// We don't expect any exceptions or null here
assert(providerBase.create(mock[Driver], Map.empty, Some("test1")).isInstanceOf[Connection])
assert(providerBase.create(mock[Driver], Map.empty, Some("test2")).isInstanceOf[Connection])
}

test("Throw an error when user specified provider that does not exist") {
val provider = new JdbcConnectionProvider() {
override val name: String = "provider"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection =
throw new RuntimeException()
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}

val providerBase = new ConnectionProviderBase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvid
override val name: String = "IntentionallyFaultyConnectionProvider"
override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true
override def getConnection(driver: Driver, options: Map[String, String]): Connection = null
override def modifiesSecurityContext(
driver: Driver,
options: Map[String, String]
): Boolean = false
}

private object IntentionallyFaultyConnectionProvider {
Expand Down

0 comments on commit 6cc4c90

Please sign in to comment.