Skip to content

Commit

Permalink
[SPARK-39399][CORE][K8S] Fix proxy-user authentication for Spark on k…
Browse files Browse the repository at this point in the history
…8s in cluster deploy mode

### What changes were proposed in this pull request?

The PR fixes the authentication failure of the proxy user on driver side while accessing kerberized hdfs through spark on k8s job. It follows the similar approach as it was done for Mesos: d2iq-archive#26

 ### Why are the changes needed?

When we try to access the kerberized HDFS through a proxy user in Spark Job running in cluster deploy mode with Kubernetes resource manager, we encounter AccessControlException. This is because  authentication in driver is done using tokens of the proxy user and since proxy user doesn't have any delegation tokens on driver, auth fails.

Further details:
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532063&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532063

 https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532135&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532135

 ### Does this PR introduce _any_ user-facing change?

Yes, user will now be able to use proxy-user to access kerberized hdfs with Spark on K8s.

### How was this patch tested?

The patch was tested by:

1. Running job which accesses kerberized hdfs with proxy user in cluster mode and client mode with kubernetes resource manager.

2. Running job which accesses kerberized hdfs without proxy user in cluster mode and client mode with kubernetes resource manager.

3. Build and run test github action : https://github.com/shrprasa/spark/actions/runs/3051203625

Closes apache#37880 from shrprasa/proxy_user_fix.

Authored-by: Shrikant Prasad <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
shrprasa authored and yaooqinn committed Mar 8, 2023
1 parent 1507a52 commit b3b3557
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,35 @@ private[spark] class SparkSubmit extends Logging {

def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
// Here we are checking for client mode because when job is sumbitted in cluster
// deploy mode with k8s resource manager, the spark submit in the driver container
// is done in client mode.
val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
args.deployMode.equals("client") &&
args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", false)
if (isKubernetesClusterModeDriver) {
logInfo("Running driver with proxy user. Cluster manager: Kubernetes")
SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
} else {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
}
}
} else {
runMain(args, uninitLog)
Expand Down

0 comments on commit b3b3557

Please sign in to comment.