Skip to content

Commit

Permalink
SPARK-2787: Make sort-based shuffle write files directly when there's…
Browse files Browse the repository at this point in the history
… no sorting/aggregation and # partitions is small

As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.

On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.

Author: Matei Zaharia <[email protected]>

Closes apache#1799 from mateiz/SPARK-2787 and squashes the following commits:

88cf26a [Matei Zaharia] Fix rebase
10233af [Matei Zaharia] Review comments
398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf
ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them
d0ae3c5 [Matei Zaharia] Fix some comments
90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests
31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter
  • Loading branch information
mateiz authored and rxin committed Aug 8, 2014
1 parent 32096c2 commit 6906b69
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 118 deletions.
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = conf.get(propertyName, defaultClassName)
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
Expand All @@ -178,11 +176,17 @@ object SparkEnv extends Logging {
}
}

val serializer = instantiateClass[Serializer](
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClass[Serializer](
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
Expand Down Expand Up @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}

val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class HashShuffleReader[K, C](
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.write(aggregatedIter)
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
private var indexFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -57,78 +58,36 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
// don't care whether the keys get sorted in each partition; that will be done on the
// reduce side if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}

// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
// serve different ranges of this file using an index file that we create at the end.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
writeMetrics)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
// out where each block begins and ends.
outputFile = blockManager.diskBlockManager.getFile(blockId)
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")

val diskBlockManager = blockManager.diskBlockManager
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
var i = 0
while (i < numPartitions + 1) {
out.writeLong(offsets(i))
i += 1
}
} finally {
out.close()
}
val partitionLengths = sorter.writePartitionedFile(blockId, context)

// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)

mapStatus = new MapStatus(blockManager.blockManagerId,
lengths.map(MapOutputTracker.compressSize))
partitionLengths.map(MapOutputTracker.compressSize))
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -145,6 +104,9 @@ private[spark] class SortShuffleWriter[K, V, C](
if (outputFile != null) {
outputFile.delete()
}
if (indexFile != null) {
indexFile.delete()
}
return None
}
} finally {
Expand Down
Loading

0 comments on commit 6906b69

Please sign in to comment.