Skip to content

Commit

Permalink
[SPARK-16757] Set up Spark caller context to HDFS and YARN
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

1. Pass `jobId` to Task.
2. Invoke Hadoop APIs.
    * A new function `setCallerContext` is added in `Utils`. `setCallerContext` function invokes APIs of   `org.apache.hadoop.ipc.CallerContext` to set up spark caller contexts, which will be written into `hdfs-audit.log` and Yarn RM audit log.
    * For HDFS: Spark sets up its caller context by invoking`org.apache.hadoop.ipc.CallerContext` in `Task` and Yarn `Client` and `ApplicationMaster`.
    * For Yarn: Spark sets up its caller context by invoking `org.apache.hadoop.ipc.CallerContext` in Yarn `Client`.

## How was this patch tested?
Manual Tests against some Spark applications in Yarn client mode and Yarn cluster mode. Need to check if spark caller contexts are written into HDFS hdfs-audit.log and Yarn RM audit log successfully.

For example, run SparkKmeans in Yarn client mode:
```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5
```

**Before**:
There will be no Spark caller context in records of `hdfs-audit.log` and Yarn RM audit log.

**After**:
Spark caller contexts will be written in records of `hdfs-audit.log` and Yarn RM audit log.

These are records in `hdfs-audit.log`:
```
2016-09-20 11:54:24,116 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_CLIENT_AppId_application_1474394339641_0005
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0
```
```
2016-09-20 11:59:33,868 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=mkdirs	src=/private/tmp/hadoop-wyang/nm-local-dir/usercache/wyang/appcache/application_1474394339641_0006/container_1474394339641_0006_01_000001/spark-warehouse	dst=null	perm=wyang:supergroup:rwxr-xr-x	proto=rpc	callerContext=SPARK_APPLICATION_MASTER_AppId_application_1474394339641_0006_AttemptId_1
2016-09-20 11:59:37,214 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0
2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0
2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0
2016-09-20 11:59:42,391 INFO FSNamesystem.audit: allowed=true	ugi=wyang (auth:SIMPLE)	ip=/127.0.0.1	cmd=open	src=/lr_big.txt	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_3_AttemptNum_0
```
This is a record in Yarn RM log:
```
2016-09-20 11:59:24,050 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=wyang	IP=127.0.0.1	OPERATION=Submit Application Request	TARGET=ClientRMService	RESULT=SUCCESS	APPID=application_1474394339641_0006	CALLERCONTEXT=SPARK_CLIENT_AppId_application_1474394339641_0006
```

Author: Weiqing Yang <[email protected]>

Closes apache#14659 from Sherry302/callercontextSubmit.
  • Loading branch information
weiqingy authored and Tom Graves committed Sep 27, 2016
1 parent 7f16aff commit 6a68c5d
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
Expand All @@ -1024,7 +1025,8 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ import org.apache.spark.rdd.RDD
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
*/
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] class ResultTask[T, U](
stageId: Int,
stageAttemptId: Int,
Expand All @@ -52,8 +57,12 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
metrics: TaskMetrics)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
metrics: TaskMetrics,
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Serializable {

@transient private[this] val preferredLocs: Seq[TaskLocation] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ import org.apache.spark.shuffle.ShuffleWriter
* @param locs preferred task execution locations for locality scheduling
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] class ShuffleMapTask(
stageId: Int,
Expand All @@ -52,8 +57,12 @@ private[spark] class ShuffleMapTask(
partition: Partition,
@transient private var locs: Seq[TaskLocation],
metrics: TaskMetrics,
localProperties: Properties)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
localProperties: Properties,
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Logging {

/** A constructor used only in test suites. This does not require passing in an RDD. */
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils}
import org.apache.spark.util._

/**
* A unit of execution. We have two kinds of Task's in Spark:
Expand All @@ -47,14 +47,22 @@ import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOu
* @param partitionId index of the number in the RDD
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
// The default value is only used in tests.
val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties) extends Serializable {
@transient var localProperties: Properties = new Properties,
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {

/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
Expand All @@ -79,9 +87,14 @@ private[spark] abstract class Task[T](
metrics)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()

if (_killed) {
kill(interruptThread = false)
}

new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()

try {
runTask(context)
} catch {
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,68 @@ private[spark] object Utils extends Logging {
}
}

/**
* An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
* constructed by parameters passed in.
* When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM
* audit log and hdfs-audit.log. That can help users to better diagnose and understand how
* specific applications impacting parts of the Hadoop system and potential problems they may be
* creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
* very helpful to track which upper level job issues it.
*
* @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
*
* The parameters below are optional:
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
* @param jobId id of the job this task belongs to
* @param stageId id of the stage this task belongs to
* @param stageAttemptId attempt id of the stage this task belongs to
* @param taskId task id
* @param taskAttemptNumber task attempt id
*/
private[spark] class CallerContext(
from: String,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
jobId: Option[Int] = None,
stageId: Option[Int] = None,
stageAttemptId: Option[Int] = None,
taskId: Option[Long] = None,
taskAttemptNumber: Option[Int] = None) extends Logging {

val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
val taskAttemptNumberStr =
if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""

val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
*/
def setCurrentContext(): Boolean = {
var succeed = false
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
val builderInst = Builder.getConstructor(classOf[String]).newInstance(context)
val hdfsContext = Builder.getMethod("build").invoke(builderInst)
callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
succeed = true
} catch {
case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
}
succeed
}
}

/**
* A utility class to redirect the child process's stdout or stderr.
*/
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,18 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
.set("spark.executor.instances", "1")) === 3)
}

test("Set Spark CallerContext") {
val context = "test"
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
assert(new CallerContext(context).setCurrentContext())
assert(s"SPARK_$context" ===
callerContext.getMethod("getCurrent").invoke(null).toString)
} catch {
case e: ClassNotFoundException =>
assert(!new CallerContext(context).setCurrentContext())
}
}

test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()

var attemptID: Option[String] = None

if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
Expand All @@ -196,8 +198,13 @@ private[spark] class ApplicationMaster(
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

attemptID = Option(appAttemptId.getAttemptId.toString)
}

new CallerContext("APPMASTER",
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()

logInfo("ApplicationAttemptId: " + appAttemptId)

val fs = FileSystem.get(yarnConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.{CallerContext, Utils}

private[spark] class Client(
val args: ClientArguments,
Expand Down Expand Up @@ -161,6 +161,8 @@ private[spark] class Client(
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)

new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

Expand Down

0 comments on commit 6a68c5d

Please sign in to comment.