Skip to content

Commit

Permalink
[SPARK-7878] Rename Stage.jobId to firstJobId
Browse files Browse the repository at this point in the history
The previous name was confusing, because each stage can be associated with
many jobs, and jobId is just the ID of the first job that was associated
with the Stage. This commit also renames some of the method parameters in
DAGScheduler.scala to clarify when the jobId refers to the first job ID
associated with the stage (as opposed to the jobId associated with a job
that's currently being scheduled).

cc markhamstra JoshRosen (hopefully this will help prevent future bugs like SPARK-6880)

Author: Kay Ousterhout <[email protected]>

Closes apache#6418 from kayousterhout/SPARK-7878 and squashes the following commits:

b71a9b8 [Kay Ousterhout] [SPARK-7878] Rename Stage.jobId to firstJobId
  • Loading branch information
kayousterhout committed May 27, 2015
1 parent 4615081 commit ff0ddff
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 37 deletions.
58 changes: 27 additions & 31 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,17 @@ class DAGScheduler(

/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
registerShuffleDependencies(shuffleDep, firstJobId)
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, jobId)
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
Expand All @@ -230,37 +228,35 @@ class DAGScheduler(
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, jobId)
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}

/**
* Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
* newOrUsedShuffleStage. The stage will be associated with the provided jobId.
* newOrUsedShuffleStage. The stage will be associated with the provided firstJobId.
* Production of shuffle map stages should always use newOrUsedShuffleStage, not
* newShuffleMapStage directly.
*/
private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
firstJobId: Int,
callSite: CallSite): ShuffleMapStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
jobId, callSite, shuffleDep)
firstJobId, callSite, shuffleDep)

stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
updateJobIdStageIdMaps(firstJobId, stage)
stage
}

/**
* Create a ResultStage -- either directly for use as a result stage, or as part of the
* (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
* with the provided jobId.
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
Expand All @@ -277,16 +273,16 @@ class DAGScheduler(

/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.size
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
Expand All @@ -304,10 +300,10 @@ class DAGScheduler(
}

/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
Expand All @@ -321,7 +317,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, jobId)
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
Expand All @@ -336,11 +332,11 @@ class DAGScheduler(
}

/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, jobId)
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}
Expand Down Expand Up @@ -390,7 +386,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
Expand Down Expand Up @@ -577,7 +573,7 @@ class DAGScheduler(

private[scheduler] def doCancelAllJobs() {
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation(_,
runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
Expand All @@ -603,7 +599,7 @@ class DAGScheduler(
clearCacheLocs()
val failedStagesCopy = failedStages.toArray
failedStages.clear()
for (stage <- failedStagesCopy.sortBy(_.jobId)) {
for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
Expand All @@ -623,7 +619,7 @@ class DAGScheduler(
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
Expand Down Expand Up @@ -843,7 +839,7 @@ class DAGScheduler(
}
}

val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down Expand Up @@ -909,7 +905,7 @@ class DAGScheduler(
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down Expand Up @@ -1323,7 +1319,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.push(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ private[spark] class ResultStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
jobId: Int,
firstJobId: Int,
callSite: CallSite)
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

// The active job for this result stage. Will be empty if the job has already finished
// (e.g., because the job was cancelled).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ private[spark] class ShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
jobId: Int,
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

override def toString: String = "ShuffleMapStage " + id

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.CallSite
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
Expand All @@ -51,7 +51,7 @@ private[spark] abstract class Stage(
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val jobId: Int,
val firstJobId: Int,
val callSite: CallSite)
extends Logging {

Expand Down

0 comments on commit ff0ddff

Please sign in to comment.