Skip to content

Commit

Permalink
[SPARK-2298] Encode stage attempt in SparkListener & UI.
Browse files Browse the repository at this point in the history
Simple way to reproduce this in the UI:

```scala
val f = new java.io.File("/tmp/test")
f.delete()
sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) =>
  if (context.partitionId == 0) {
    val f = new java.io.File("/tmp/test")
    if (!f.exists) {
      f.mkdir()
      System.exit(0);
    }
  }
  iter
}.count()
```

Author: Reynold Xin <[email protected]>

Closes apache#1545 from rxin/stage-attempt and squashes the following commits:

3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed.
40a6bd5 [Reynold Xin] Updated test suites.
c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite.
b3e2eed [Reynold Xin] Oops previous code didn't compile.
0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener.
6c08b07 [Reynold Xin] Addressed code review feedback.
4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
  • Loading branch information
rxin authored and pwendell committed Aug 20, 2014
1 parent b3ec51b commit fb60bec
Show file tree
Hide file tree
Showing 15 changed files with 555 additions and 224 deletions.
77 changes: 48 additions & 29 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics)
taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)
Expand Down Expand Up @@ -677,7 +677,10 @@ class DAGScheduler(
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
}

Expand All @@ -695,8 +698,8 @@ class DAGScheduler(
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
runningStages.foreach { stage =>
stage.info.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.info))
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
}
Expand Down Expand Up @@ -781,7 +784,16 @@ class DAGScheduler(
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
var tasks = ArrayBuffer[Task[_]]()

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = {
if (stage.isShuffleMap) {
(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
} else {
val job = stage.resultOfJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
}

val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
Expand All @@ -795,7 +807,8 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
Expand Down Expand Up @@ -826,20 +839,19 @@ class DAGScheduler(
return
}

if (stage.isShuffleMap) {
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
val part = stage.rdd.partitions(p)
tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
} else {
// This is a final stage; figure out its job's missing partitions
val job = stage.resultOfJob.get
for (id <- 0 until job.numPartitions if !job.finished(id)) {
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}

Expand Down Expand Up @@ -869,11 +881,11 @@ class DAGScheduler(
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.info.submissionTime = Some(clock.getTime())
stage.latestInfo.submissionTime = Some(clock.getTime())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
listenerBus.post(SparkListenerStageCompleted(stage.info))
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
runningStages -= stage
Expand All @@ -892,8 +904,9 @@ class DAGScheduler(
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
event.taskInfo, event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
Expand All @@ -902,14 +915,19 @@ class DAGScheduler(
}
val stage = stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.info.submissionTime match {
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.info.completionTime = Some(clock.getTime())
listenerBus.post(SparkListenerStageCompleted(stage.info))
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTime())
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
event.reason match {
Expand All @@ -924,7 +942,7 @@ class DAGScheduler(
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
Expand All @@ -935,8 +953,8 @@ class DAGScheduler(
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down Expand Up @@ -1029,6 +1047,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
Expand Down Expand Up @@ -1142,7 +1161,7 @@ class DAGScheduler(
}
val dependentJobs: Seq[ActiveJob] =
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
failedStage.info.completionTime = Some(clock.getTime())
failedStage.latestInfo.completionTime = Some(clock.getTime())
for (job <- dependentJobs) {
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
}
Expand Down Expand Up @@ -1182,8 +1201,8 @@ class DAGScheduler(
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
stage.info.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stage.info))
stage.latestInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
Expand Down Expand Up @@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

/**
* Periodic updates from executors.
* @param execId executor id
* @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
taskMetrics: Seq[(Long, Int, TaskMetrics)])
taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
extends SparkListenerEvent

@DeveloperApi
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
*
* A single stage can consist of multiple attempts. In that case, the latestInfo field will
* be updated for each attempt.
*
*/
private[spark] class Stage(
val id: Int,
Expand Down Expand Up @@ -71,8 +74,8 @@ private[spark] class Stage(
val name = callSite.shortForm
val details = callSite.longForm

/** Pointer to the [StageInfo] object, set by DAGScheduler. */
var info: StageInfo = StageInfo.fromStage(this)
/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
var latestInfo: StageInfo = StageInfo.fromStage(this)

def isAvailable: Boolean = {
if (!isShuffleMap) {
Expand Down Expand Up @@ -116,6 +119,7 @@ private[spark] class Stage(
}
}

/** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
nextAttemptId += 1
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
Expand Down Expand Up @@ -56,9 +57,15 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
new StageInfo(
stage.id,
stage.attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
stage.details)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl(
execId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId): Boolean = {
val metricsWithStageIds = taskMetrics.flatMap {
case (id, metrics) => {

val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
taskMetrics.flatMap { case (id, metrics) =>
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
.map(_.stageId)
.map(x => (id, x, metrics))
.map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
}
}
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,5 @@ private[spark] class TaskSet(
val properties: Properties) {
val id: String = stageId + "." + attempt

def kill(interruptThread: Boolean) {
tasks.foreach(_.kill(interruptThread))
}

override def toString: String = "TaskSet " + id
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Page showing executor summary */
private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
/** Stage summary grouped by executors. */
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
private val listener = parent.listener

def toNodeSeq: Seq[Node] = {
Expand Down Expand Up @@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

listener.stageIdToData.get(stageId) match {
listener.stageIdToData.get((stageId, stageAttemptId)) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
Expand Down
Loading

0 comments on commit fb60bec

Please sign in to comment.