Skip to content

Commit

Permalink
[SPARK-6627] Finished rename to ShuffleBlockResolver
Browse files Browse the repository at this point in the history
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager
to ShuffleBlockResolver, but didn't rename the associated subclasses and
variables; this commit does that.

I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class?

cc pwendell

Author: Kay Ousterhout <[email protected]>

Closes apache#5764 from kayousterhout/SPARK-6627 and squashes the following commits:

43add1e [Kay Ousterhout] Spacing fix
96080bf [Kay Ousterhout] Test fixes
d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
  • Loading branch information
kayousterhout authored and JoshRosen committed May 8, 2015
1 parent 2d05f32 commit 4b3bb0e
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
Expand Down Expand Up @@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
private[spark] class FileShuffleBlockResolver(conf: SparkConf)
extends ShuffleBlockResolver with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)
Expand Down Expand Up @@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}

private[spark]
object FileShuffleBlockManager {
private[spark] object FileShuffleBlockResolver {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io._
import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

Expand All @@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils

import IndexShuffleBlockManager.NOOP_REDUCE_ID
import IndexShuffleBlockResolver.NOOP_REDUCE_ID

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID
*
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {

private lazy val blockManager = SparkEnv.get.blockManager

Expand Down Expand Up @@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
override def stop(): Unit = {}
}

private[spark] object IndexShuffleBlockManager {
private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {

private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
Expand Down Expand Up @@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleBlockResolver.removeShuffle(shuffleId)
}

override def shuffleBlockResolver: FileShuffleBlockManager = {
fileShuffleBlockManager
override def shuffleBlockResolver: FileShuffleBlockResolver = {
fileShuffleBlockResolver
}

/** Shut down this ShuffleManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter

private[spark] class HashShuffleWriter[K, V](
shuffleBlockManager: FileShuffleBlockManager,
shuffleBlockResolver: FileShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
Expand All @@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](

private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)

/** Write a bunch of records to this task's output */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

/**
Expand Down Expand Up @@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
true
}

override def shuffleBlockResolver: IndexShuffleBlockManager = {
indexShuffleBlockManager
override def shuffleBlockResolver: IndexShuffleBlockResolver = {
indexShuffleBlockResolver
}

/** Shut down this ShuffleManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter

private[spark] class SortShuffleWriter[K, V, C](
shuffleBlockManager: IndexShuffleBlockManager,
shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
Expand Down Expand Up @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
Expand All @@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}

// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,11 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockManager = shuffleManager.shuffleBlockResolver
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.FileShuffleBlockManager
import org.apache.spark.shuffle.FileShuffleBlockResolver
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}

class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
Expand All @@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {

sc = new SparkContext("local", "test", conf)

val shuffleBlockManager =
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
val shuffleBlockResolver =
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]

val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle1.writers) {
writer.write("test1", "value")
Expand All @@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)

val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)

for (writer <- shuffle2.writers) {
Expand All @@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// of block based on remaining data in file : which could mess things up when there is
// concurrent read and writes happening to the same shuffle group.

val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
for (writer <- shuffle3.writers) {
writer.write("test3", "value")
Expand All @@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
writer.commitAndClose()
}
// check before we register.
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffleBlockManager.removeShuffle(1)
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffleBlockResolver.removeShuffle(1)
}

def writeToFile(file: File, numBytes: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);

private final ExternalShuffleBlockManager blockManager;
private final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockManager blockManager) {
ExternalShuffleBlockResolver blockManager) {
this.streamManager = streamManager;
this.blockManager = blockManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
* from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
* from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
*
* Executors with shuffle file consolidation are not currently supported, as the index is stored in
* the Executor's memory, unlike the IndexShuffleBlockManager.
* the Executor's memory, unlike the IndexShuffleBlockResolver.
*/
public class ExternalShuffleBlockManager {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);

// Map containing all registered executors' metadata.
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
Expand All @@ -60,15 +60,15 @@ public class ExternalShuffleBlockManager {

private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
public ExternalShuffleBlockResolver(TransportConf conf) {
this(conf, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
Expand Down Expand Up @@ -168,7 +168,7 @@ private void deleteExecutorDirs(String[] dirs) {

/**
* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockManager.
* This logic is from FileShuffleBlockResolver.
*/
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
Expand All @@ -178,7 +178,7 @@ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor,

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);

OneForOneStreamManager streamManager;
ExternalShuffleBlockManager blockManager;
ExternalShuffleBlockResolver blockResolver;
RpcHandler handler;

@Before
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
blockManager = mock(ExternalShuffleBlockManager.class);
handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
blockResolver = mock(ExternalShuffleBlockResolver.class);
handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
}

@Test
Expand All @@ -62,7 +62,7 @@ public void testRegisterExecutor() {
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
handler.receive(client, registerMessage, callback);
verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);

verify(callback, times(1)).onSuccess((byte[]) any());
verify(callback, never()).onFailure((Throwable) any());
Expand All @@ -75,12 +75,12 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
handler.receive(client, openBlocks, callback);
verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");

ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
verify(callback, times(1)).onSuccess(response.capture());
Expand Down
Loading

0 comments on commit 4b3bb0e

Please sign in to comment.