Skip to content

Commit

Permalink
[SPARK-9852] Let reduce tasks fetch multiple map output partitions
Browse files Browse the repository at this point in the history
This makes two changes:

- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task

I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.

Author: Matei Zaharia <[email protected]>

Closes apache#8844 from mateiz/spark-9852.
  • Loading branch information
mateiz committed Sep 25, 2015
1 parent 8023242 commit 21fd12c
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 124 deletions.
79 changes: 68 additions & 11 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,25 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
}

/**
* Called from executors to get the server URIs and output sizes for each shuffle block that
* needs to be read from a given range of map output partitions (startPartition is included but
* endPartition is excluded from the range).
*
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
}

Expand Down Expand Up @@ -262,6 +276,21 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
/** Cache a serialized version of the output statuses for each shuffle to send them out faster */
private var cacheEpoch = epoch

/** Whether to compute locality preferences for reduce tasks */
private val shuffleLocalityEnabled = conf.getBoolean("spark.shuffle.reduceLocality.enabled", true)

// Number of map and reduce tasks above which we do not assign preferred locations based on map
// output sizes. We limit the size of jobs for which assign preferred locations as computing the
// top locations by size becomes expensive.
private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000

// Fraction of total map output that must be at a location for it to considered as a preferred
// location for a reduce task. Making this larger will focus on fewer locations where most data
// can be read locally, but may lead to more delay in scheduling if those locations are busy.
private val REDUCER_PREF_LOCS_FRACTION = 0.2

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
Expand Down Expand Up @@ -322,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

/**
* Return the preferred hosts on which to run the given map output partition in a given shuffle,
* i.e. the nodes that the most outputs for that partition are on.
*
* @param dep shuffle dependency object
* @param partitionId map output partition that we want to read
* @return a sequence of host names
*/
def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
: Seq[String] = {
if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
if (blockManagerIds.nonEmpty) {
blockManagerIds.get.map(_.host)
} else {
Nil
}
} else {
Nil
}
}

/**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
Expand Down Expand Up @@ -460,34 +513,38 @@ private[spark] object MapOutputTracker extends Logging {
}

/**
* Converts an array of MapStatuses for a given reduce ID to a sequence that, for each block
* manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at that
* block manager.
* Given an array of map statuses and a range of map output partitions, returns a sequence that,
* for each block manager ID, lists the shuffle block IDs and corresponding shuffle block sizes
* stored at that block manager.
*
* If any of the statuses is null (indicating a missing location due to a failed mapper),
* throws a FetchFailedException.
*
* @param shuffleId Identifier for the shuffle
* @param reduceId Identifier for the reduce task
* @param startPartition Start of map output partition ID range (included in range)
* @param endPartition End of map output partition ID range (excluded from range)
* @param statuses List of map statuses, indexed by map ID.
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
* and the second item is a sequence of (shuffle block id, shuffle block size) tuples
* and the second item is a sequence of (shuffle block ID, shuffle block size) tuples
* describing the shuffle blocks that are stored at that block manager.
*/
private def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
for ((status, mapId) <- statuses.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, reduceId, errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, reduceId), status.getSizeForBlock(reduceId)))
for (part <- startPartition until endPartition) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}

override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
}

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,6 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

// Flag to control if reduce tasks are assigned preferred locations
private val shuffleLocalityEnabled =
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
// Number of map, reduce tasks above which we do not assign preferred locations
// based on map output sizes. We limit the size of jobs for which assign preferred locations
// as computing the top locations by size becomes expensive.
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000

// Fraction of total map output that must be at a location for it to considered as a preferred
// location for a reduce task.
// Making this larger will focus on fewer locations where most data can be read locally, but
// may lead to more delay in scheduling if those locations are busy.
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1570,25 +1554,10 @@ class DAGScheduler(
return locs
}
}

case _ =>
}

// If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations that
// have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {
rdd.dependencies.foreach {
case s: ShuffleDependency[_, _, _] =>
if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
// Get the preferred map output locations for this reducer
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
if (topLocsForReducer.nonEmpty) {
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
}
}
case _ =>
}
}
Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

/**
* Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
* requesting them from other nodes' block stores.
*/
private[spark] class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
Expand All @@ -32,9 +36,6 @@ private[spark] class BlockStoreShuffleReader[K, C](
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
extends ShuffleReader[K, C] with Logging {

require(endPartition == startPartition + 1,
"Hash shuffle currently only supports fetching one partition")

private val dep = handle.dependency

/** Read the combined key-values for this reduce task */
Expand All @@ -43,7 +44,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition),
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.rdd.{ShuffledRDDPartition, RDD, ShuffledRDD}
import org.apache.spark._

object AdaptiveSchedulingSuiteState {
Expand All @@ -28,26 +27,10 @@ object AdaptiveSchedulingSuiteState {
}
}

/** A special ShuffledRDD where we can pass a ShuffleDependency object to use */
class CustomShuffledRDD[K, V, C](@transient dep: ShuffleDependency[K, V, C])
extends RDD[(K, C)](dep.rdd.context, Seq(dep)) {

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}

override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](dep.partitioner.numPartitions)(i => new ShuffledRDDPartition(i))
}
}

class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
test("simple use of submitMapStage") {
try {
sc = new SparkContext("local[1,2]", "test")
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 3, 3).map { x =>
AdaptiveSchedulingSuiteState.tasksRun += 1
(x, x)
Expand All @@ -62,4 +45,32 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
AdaptiveSchedulingSuiteState.clear()
}
}

test("fetching multiple map output partitions per reduce") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 2))
assert(shuffled.partitions.length === 2)
assert(shuffled.glom().map(_.toSet).collect().toSet == Set(Set((0, 0), (1, 1)), Set((2, 2))))
}

test("fetching all map output partitions in one reduce") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
// Also create lots of hash partitions so that some of them are empty
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(5))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0))
assert(shuffled.partitions.length === 1)
assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
}

test("more reduce tasks than map output partitions") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2))
assert(shuffled.partitions.length === 7)
assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
}
}
Loading

0 comments on commit 21fd12c

Please sign in to comment.