Skip to content

Commit

Permalink
[SPARK-20715] Store MapStatuses only in MapOutputTracker, not Shuffle…
Browse files Browse the repository at this point in the history
…MapStage

## What changes were proposed in this pull request?

This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.

### Background

In Spark there are currently two places where MapStatuses are tracked:

- The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
- Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.

This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.

### Why we only need to track a single location for each map output

I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:

- In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
- If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
- There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However:
  - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
  - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though).
- Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.

### Overview of other changes

- Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
  - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
  - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
- Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
- Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
- Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches.

I will comment on these changes via inline GitHub review comments.

/cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes).

## How was this patch tested?

Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

Author: Josh Rosen <[email protected]>

Closes apache#17955 from JoshRosen/map-output-tracker-rewrite.
  • Loading branch information
JoshRosen committed Jun 12, 2017
1 parent f48273c commit 3476390
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 389 deletions.
636 changes: 360 additions & 276 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,14 @@ private[spark] class Executor(
throw new TaskKilledException(killReason.get)
}

logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// The purpose of updating the epoch here is to invalidate executor map output status cache
// in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
// MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
// we don't need to make any special calls here.
if (!isLocal) {
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
}

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
Expand Down
51 changes: 14 additions & 37 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,25 +328,14 @@ class DAGScheduler(
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
Expand Down Expand Up @@ -1217,7 +1206,8 @@ class DAGScheduler(
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
// Remove the task's partition from pending partitions. This may have already been
// done above, but will not have been done yet in cases where the task attempt was
// from an earlier attempt of the stage (i.e., not the attempt that's currently
Expand All @@ -1234,16 +1224,14 @@ class DAGScheduler(
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)

// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()

clearCacheLocs()

Expand Down Expand Up @@ -1343,7 +1331,6 @@ class DAGScheduler(
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

Expand Down Expand Up @@ -1393,17 +1380,7 @@ class DAGScheduler(

if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
mapOutputTracker.removeOutputsOnExecutor(execId)
clearCacheLocs()
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import scala.collection.mutable.HashSet

import org.apache.spark.ShuffleDependency
import org.apache.spark.{MapOutputTrackerMaster, ShuffleDependency, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite

/**
Expand All @@ -42,13 +41,12 @@ private[spark] class ShuffleMapStage(
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
val shuffleDep: ShuffleDependency[_, _, _],
mapOutputTrackerMaster: MapOutputTrackerMaster)
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

private[this] var _mapStageJobs: List[ActiveJob] = Nil

private[this] var _numAvailableOutputs: Int = 0

/**
* Partitions that either haven't yet been computed, or that were computed on an executor
* that has since been lost, so should be re-computed. This variable is used by the
Expand All @@ -60,13 +58,6 @@ private[spark] class ShuffleMapStage(
*/
val pendingPartitions = new HashSet[Int]

/**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
* (a single task might run multiple times).
*/
private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)

override def toString: String = "ShuffleMapStage " + id

/**
Expand All @@ -88,69 +79,18 @@ private[spark] class ShuffleMapStage(
/**
* Number of partitions that have shuffle outputs.
* When this reaches [[numPartitions]], this map stage is ready.
* This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
*/
def numAvailableOutputs: Int = _numAvailableOutputs
def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)

/**
* Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
* This should be the same as `outputLocs.contains(Nil)`.
*/
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
def isAvailable: Boolean = numAvailableOutputs == numPartitions

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}

def addOutputLoc(partition: Int, status: MapStatus): Unit = {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
if (prevList == Nil) {
_numAvailableOutputs += 1
}
}

def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
_numAvailableOutputs -= 1
}
}

/**
* Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
* value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
* that position is filled with null.
*/
def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
outputLocs.map(_.headOption.orNull)
}

/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.executorId == execId)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
this, execId, _numAvailableOutputs, numPartitions, isAvailable))
}
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](

var backend: SchedulerBackend = null

val mapOutputTracker = SparkEnv.get.mapOutputTracker
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,21 @@ class MapOutputTrackerSuite extends SparkFunSuite {
slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)

masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
// This is expected to fail because no outputs have been registered for the shuffle.
intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }

val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
assert(0 == masterTracker.getNumCachedSerializedBroadcast)

val masterTrackerEpochBeforeLossOfMapOutput = masterTracker.getEpoch
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
masterTracker.incrementEpoch()
assert(masterTracker.getEpoch > masterTrackerEpochBeforeLossOfMapOutput)
slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) }

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleHandle = manager.registerShuffle(0, 1, shuffleDep)
mapTrackerMaster.registerShuffle(0, 1)

// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
Expand Down Expand Up @@ -393,7 +394,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

// register one of the map outputs -- doesn't matter which one
mapOutput1.foreach { case mapStatus =>
mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))
mapTrackerMaster.registerMapOutput(0, 0, mapStatus)
}

val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
sc = new SparkContext(conf)
val scheduler = mock[TaskSchedulerImpl]
when(scheduler.sc).thenReturn(sc)
when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker)
when(scheduler.mapOutputTracker).thenReturn(
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])
scheduler
}

Expand Down

0 comments on commit 3476390

Please sign in to comment.