Skip to content

Commit

Permalink
SPARK-2035: Store call stack for stages, display it on the UI.
Browse files Browse the repository at this point in the history
I'm not sure about the test -- I get a lot of unrelated failures for some reason. I'll try to sort it out. But hopefully the automation will test this for me if I send a pull request :).

I'll attach a demo HTML in [Jira](https://issues.apache.org/jira/browse/SPARK-2035).

Author: Daniel Darabos <[email protected]>
Author: Patrick Wendell <[email protected]>

Closes apache#981 from darabos/darabos-call-stack and squashes the following commits:

f7c6bfa [Daniel Darabos] Fix bad merge. I undid 83c226d by Doris.
3d0a48d [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack
b857849 [Daniel Darabos] Style: Break long line.
ecb5690 [Daniel Darabos] Include the last Spark method in the full stack trace. Otherwise it is not visible if the stage name is overridden.
d00a85b [Patrick Wendell] Make call sites for stages non-optional and well defined
b9eba24 [Daniel Darabos] Make StageInfo.details non-optional. Add JSON serialization code for the new field. Verify JSON backward compatibility.
4312828 [Daniel Darabos] Remove Mima excludes for CallSite. They should be unnecessary now, with SPARK-2070 fixed.
0920750 [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack
a4b1faf [Daniel Darabos] Add Mima exclusions for the CallSite changes it has picked up. They are private methods/classes, so we ought to be safe.
932f810 [Daniel Darabos] Use empty CallSite instead of null in DAGSchedulerSuite. Outside of testing, this parameter always originates in SparkContext.scala, and will never be null.
ccd89d1 [Daniel Darabos] Fix long lines.
ac173e4 [Daniel Darabos] Hide "show details" if there are no details to show.
6182da6 [Daniel Darabos] Set a configurable limit on maximum call stack depth. It can be useful in memory-constrained situations with large numbers of stages.
8fe2e34 [Daniel Darabos] Store call stack for stages, display it on the UI.
  • Loading branch information
darabos authored and pwendell committed Jun 17, 2014
1 parent 8cd04c3 commit 23a12ce
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 53 deletions.
21 changes: 21 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,24 @@ span.kill-link {
span.kill-link a {
color: gray;
}

span.expand-details {
font-size: 10pt;
cursor: pointer;
color: grey;
float: right;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stage-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -1036,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
val defaultCallSite = Utils.getCallSiteInfo
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, long = "")
case None => Utils.getCallSite
}
}

/**
Expand All @@ -1058,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1143,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
logInfo("Starting job: " + callSite)
logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -1189,8 +1189,8 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString
@transient private[spark] val creationSite = Utils.getCallSite
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.util.Properties

import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite

/**
* Tracks information about an active job in the DAGScheduler.
Expand All @@ -29,7 +30,7 @@ private[spark] class ActiveJob(
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

Expand Down
24 changes: 13 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.{SystemClock, Clock, Utils}
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand Down Expand Up @@ -195,7 +195,9 @@ class DAGScheduler(
case Some(stage) => stage
case None =>
val stage =
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
newOrUsedStage(
shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
shuffleDep.rdd.creationSite)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
Expand All @@ -212,7 +214,7 @@ class DAGScheduler(
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: Option[String] = None)
callSite: CallSite)
: Stage =
{
val id = nextStageId.getAndIncrement()
Expand All @@ -235,7 +237,7 @@ class DAGScheduler(
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
callSite: Option[String] = None)
callSite: CallSite)
: Stage =
{
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
Expand Down Expand Up @@ -413,7 +415,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
Expand Down Expand Up @@ -443,7 +445,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
Expand All @@ -452,7 +454,7 @@ class DAGScheduler(
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
logInfo("Failed to run " + callSite.short)
throw exception
}
}
Expand All @@ -461,7 +463,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
callSite: String,
callSite: CallSite,
timeout: Long,
properties: Properties = null)
: PartialResult[R] =
Expand Down Expand Up @@ -666,15 +668,15 @@ class DAGScheduler(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
Expand All @@ -685,7 +687,7 @@ class DAGScheduler(
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite, partitions.length, allowLocal))
job.jobId, callSite.short, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite

/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
Expand All @@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite

/**
* A stage is a set of independent tasks all computing the same function that need to run as part
Expand All @@ -35,6 +36,11 @@ import org.apache.spark.storage.BlockManagerId
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
* The callSite provides a location in user code which relates to the stage. For a shuffle map
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
*
*/
private[spark] class Stage(
val id: Int,
Expand All @@ -43,7 +49,7 @@ private[spark] class Stage(
val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val jobId: Int,
callSite: Option[String])
val callSite: CallSite)
extends Logging {

val isShuffleMap = shuffleDep.isDefined
Expand Down Expand Up @@ -100,7 +106,8 @@ private[spark] class Stage(
id
}

val name = callSite.getOrElse(rdd.getCreationSite)
val name = callSite.short
val details = callSite.long

override def toString = "Stage " + id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import org.apache.spark.storage.RDDInfo
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
class StageInfo(
val stageId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
val details: String) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
Expand All @@ -52,6 +57,6 @@ private[spark] object StageInfo {
def fromStage(stage: Stage): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
}
}
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,17 @@ private[ui] class StageTableBase(
{s.name}
</a>

val details = if (s.details.nonEmpty) (
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+show details
</span>
<pre class="stage-details collapsed">{s.details}</pre>
)

listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div> {killLink}{nameLink}</div>)
.getOrElse(<div>{killLink} {nameLink} {details}</div>)
}

protected def stageRow(s: StageInfo): Seq[Node] = {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private[spark] object JsonProtocol {
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
("Details" -> stageInfo.details) ~
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
Expand Down Expand Up @@ -469,12 +470,13 @@ private[spark] object JsonProtocol {
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val details = (json \ "Details").extractOpt[String].getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]

val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down
Loading

0 comments on commit 23a12ce

Please sign in to comment.