Skip to content

Commit

Permalink
[SPARK-5259] [CORE] don't submit stage until its dependencies map out…
Browse files Browse the repository at this point in the history
…puts are registered

Track pending tasks by partition ID instead of Task objects.

Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered.  This was due to an error in the condition for registering map outputs.

Author: hushan[胡珊] <[email protected]>
Author: Imran Rashid <[email protected]>

Closes #7699 from squito/SPARK-5259.
  • Loading branch information
suyanNone authored and squito committed Sep 21, 2015
1 parent 331f0b1 commit b78c65b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
stage.pendingPartitions.clear()

// First figure out the indexes of partition ids to compute.
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
Expand Down Expand Up @@ -1060,8 +1060,8 @@ class DAGScheduler(

if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
Expand Down Expand Up @@ -1152,7 +1152,7 @@ class DAGScheduler(
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down Expand Up @@ -1198,7 +1198,7 @@ class DAGScheduler(
shuffleStage.addOutputLoc(smt.partitionId, status)
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
Expand Down Expand Up @@ -1242,7 +1242,7 @@ class DAGScheduler(

case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task
stage.pendingPartitions += task.partitionId

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

var pendingTasks = new HashSet[Task[_]]
val pendingPartitions = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
taskName, taskId, host, taskLocality, serializedTask.limit))
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")

sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
Expand Down
197 changes: 182 additions & 15 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
Expand All @@ -490,7 +490,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
// we can see both result blocks now
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
Expand Down Expand Up @@ -782,8 +782,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
Expand Down Expand Up @@ -1035,6 +1035,173 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

/**
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
* have completions from both the first & second attempt of stage 1. So all the map output is
* available before we finish any task set for stage 1. We want to make sure that we don't
* submit stage 2 until the map output for stage 1 is registered
*/
test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, null)
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

// things start out smoothly, stage 0 completes with no issues
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
))

// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
null,
null,
createFakeTaskInfo(),
null))

// so we resubmit stage 0, which completes happily
scheduler.resubmitFailedStages()
val stage0Resubmit = taskSets(2)
assert(stage0Resubmit.stageId == 0)
assert(stage0Resubmit.stageAttemptId === 1)
val task = stage0Resubmit.tasks(0)
assert(task.partitionId === 2)
runEvent(CompletionEvent(
task,
Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.length),
null,
createFakeTaskInfo(),
null))

// now here is where things get tricky : we will now have a task set representing
// the second attempt for stage 1, but we *also* have some tasks for the first attempt for
// stage 1 still going
val stage1Resubmit = taskSets(3)
assert(stage1Resubmit.stageId == 1)
assert(stage1Resubmit.stageAttemptId === 1)
assert(stage1Resubmit.tasks.length === 3)

// we'll have some tasks finish from the first attempt, and some finish from the second attempt,
// so that we actually have all stage outputs, though no attempt has completed all its
// tasks
runEvent(CompletionEvent(
taskSets(3).tasks(0),
Success,
makeMapStatus("hostC", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))
runEvent(CompletionEvent(
taskSets(3).tasks(1),
Success,
makeMapStatus("hostC", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))
// late task finish from the first attempt
runEvent(CompletionEvent(
taskSets(1).tasks(2),
Success,
makeMapStatus("hostB", reduceRdd.partitions.length),
null,
createFakeTaskInfo(),
null))

// What should happen now is that we submit stage 2. However, we might not see an error
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But
// we can check some conditions.
// Note that the really important thing here is not so much that we submit stage 2 *immediately*
// but that we don't end up with some error from these interleaved completions. It would also
// be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had
// all its tasks complete

// check that we have all the map output for stage 0 (it should have been there even before
// the last round of completions from stage 1, but just to double check it hasn't been messed
// up) and also the newly available stage 1
val stageToReduceIdxs = Seq(
0 -> (0 until 3),
1 -> (0 until 1)
)
for {
(stage, reduceIdxs) <- stageToReduceIdxs
reduceIdx <- reduceIdxs
} {
// this would throw an exception if the map status hadn't been registered
val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
assert(statuses != null)
assert(statuses.nonEmpty)
}

// and check that stage 2 has been submitted
assert(taskSets.size == 5)
val stage2TaskSet = taskSets(4)
assert(stage2TaskSet.stageId == 2)
assert(stage2TaskSet.stageAttemptId == 0)
}

/**
* We lose an executor after completing some shuffle map tasks on it. Those tasks get
* resubmitted, and when they finish the job completes normally
*/
test("register map outputs correctly after ExecutorLost and task Resubmitted") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, null)
val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
submit(reduceRdd, Array(0))

// complete some of the tasks from the first stage, on one host
runEvent(CompletionEvent(
taskSets(0).tasks(0), Success,
makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSets(0).tasks(1), Success,
makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))

// now that host goes down
runEvent(ExecutorLost("exec-hostA"))

// so we resubmit those tasks
runEvent(CompletionEvent(
taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))

// now complete everything on a different host
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))
))

// now we should submit stage 1, and the map output from stage 0 should be registered

// check that we have all the map output for stage 0
(0 until reduceRdd.partitions.length).foreach { reduceIdx =>
val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
assert(statuses != null)
assert(statuses.nonEmpty)
}

// and check that stage 1 has been submitted
assert(taskSets.size == 2)
val stage1TaskSet = taskSets(1)
assert(stage1TaskSet.stageId == 1)
assert(stage1TaskSet.stageAttemptId == 0)
}

/**
* Makes sure that failures of stage used by multiple jobs are correctly handled.
*
Expand Down Expand Up @@ -1393,8 +1560,8 @@ class DAGSchedulerSuite
// Submit a map stage by itself
submitMapStage(shuffleDep)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
assert(results.size === 1)
results.clear()
assertDataStructuresEmpty()
Expand All @@ -1407,7 +1574,7 @@ class DAGSchedulerSuite
// Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
// from, then TaskSet 3 will run the reduce stage
scheduler.resubmitFailedStages()
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
results.clear()
Expand Down Expand Up @@ -1452,33 +1619,33 @@ class DAGSchedulerSuite
// Complete the first stage
assert(taskSets(0).stageId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", rdd1.partitions.size)),
(Success, makeMapStatus("hostB", rdd1.partitions.size))))
(Success, makeMapStatus("hostA", rdd1.partitions.length)),
(Success, makeMapStatus("hostB", rdd1.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
assert(listener1.results.size === 1)

// When attempting the second stage, show a fetch failure
assert(taskSets(1).stageId === 1)
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostA", rdd2.partitions.size)),
(Success, makeMapStatus("hostA", rdd2.partitions.length)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
assert(listener2.results.size === 0) // Second stage listener should not have a result yet

// Stage 0 should now be running as task set 2; make its task succeed
assert(taskSets(2).stageId === 0)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.size))))
(Success, makeMapStatus("hostC", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
assert(listener2.results.size === 0) // Second stage listener should still not have a result

// Stage 1 should now be running as task set 3; make its first task succeed
assert(taskSets(3).stageId === 1)
complete(taskSets(3), Seq(
(Success, makeMapStatus("hostB", rdd2.partitions.size)),
(Success, makeMapStatus("hostD", rdd2.partitions.size))))
(Success, makeMapStatus("hostB", rdd2.partitions.length)),
(Success, makeMapStatus("hostD", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
assert(listener2.results.size === 1)
Expand All @@ -1494,7 +1661,7 @@ class DAGSchedulerSuite
// TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
assert(taskSets(5).stageId === 1)
complete(taskSets(5), Seq(
(Success, makeMapStatus("hostE", rdd2.partitions.size))))
(Success, makeMapStatus("hostE", rdd2.partitions.length))))
complete(taskSets(6), Seq(
(Success, 53)))
assert(listener3.results === Map(0 -> 52, 1 -> 53))
Expand Down

0 comments on commit b78c65b

Please sign in to comment.