Skip to content

Commit

Permalink
[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore
Browse files Browse the repository at this point in the history
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.

This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.

This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).

Author: Josh Rosen <[email protected]>

Closes apache#11748 from JoshRosen/chunked-block-serialization.
  • Loading branch information
JoshRosen committed Mar 18, 2016
1 parent 6037ed0 commit 6c2d894
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
*/
public final class NettyManagedBuffer extends ManagedBuffer {
public class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;

public NettyManagedBuffer(ByteBuf buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}

/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
Expand Down Expand Up @@ -107,18 +107,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
val pieceId = BroadcastBlockId(id, "piece" + i)
if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
blocks.length
}

/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[ByteBuffer] = {
private def readBlocks(): Array[ChunkedByteBuffer] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these chunks from this executor as well.
val blocks = new Array[ByteBuffer](numBlocks)
val blocks = new Array[ChunkedByteBuffer](numBlocks)
val bm = SparkEnv.get.blockManager

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
Expand Down Expand Up @@ -182,7 +183,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
val blocks = readBlocks().flatMap(_.getChunks())
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

val obj = TorrentBroadcast.unBlockifyObject[T](
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer

/**
* Spark executor, backed by a threadpool to run tasks.
Expand Down Expand Up @@ -297,7 +298,9 @@ private[spark] class Executor(
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
serializedTaskResult.get.toByteBuffer)
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
Expand Down
104 changes: 48 additions & 56 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.ByteBuffer

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NonFatal

import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryManager
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
Expand All @@ -43,6 +41,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.util._
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}

/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
Expand Down Expand Up @@ -296,7 +295,7 @@ private[spark] class BlockManager(
* Put the block locally, using the given storage level.
*/
override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
putBytes(blockId, data.nioByteBuffer(), level)
putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
}

/**
Expand Down Expand Up @@ -444,7 +443,7 @@ private[spark] class BlockManager(
/**
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
Expand All @@ -453,7 +452,8 @@ private[spark] class BlockManager(
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
new ChunkedByteBuffer(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
} else {
blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
Expand All @@ -465,7 +465,7 @@ private[spark] class BlockManager(
* Must be called while holding a read lock on the block.
* Releases the read lock upon exception; keeps the read lock upon successful return.
*/
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
val level = info.level
logDebug(s"Level for block $blockId is $level")
// In order, try to read the serialized bytes from memory, then from disk, then fall back to
Expand Down Expand Up @@ -504,7 +504,7 @@ private[spark] class BlockManager(
*/
def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
}
}

Expand All @@ -521,7 +521,7 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down Expand Up @@ -567,7 +567,7 @@ private[spark] class BlockManager(
}

if (data != null) {
return Some(data)
return Some(new ChunkedByteBuffer(data))
}
logDebug(s"The value of block $blockId is null")
}
Expand Down Expand Up @@ -705,7 +705,7 @@ private[spark] class BlockManager(
*/
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
Expand All @@ -725,7 +725,7 @@ private[spark] class BlockManager(
*/
private def doPutBytes(
blockId: BlockId,
bytes: ByteBuffer,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
Expand All @@ -734,25 +734,22 @@ private[spark] class BlockManager(
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (level.replication > 1) {
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = bytes.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, level)
replicate(blockId, bytes, level)
}(futureExecutionContext)
} else {
null
}

bytes.rewind()
val size = bytes.limit()
val size = bytes.size

if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = dataDeserialize(blockId, bytes.duplicate())
val values = dataDeserialize(blockId, bytes)
memoryStore.putIterator(blockId, values, level) match {
case Right(_) => true
case Left(iter) =>
Expand Down Expand Up @@ -922,7 +919,7 @@ private[spark] class BlockManager(
try {
replicate(blockId, bytesToReplicate, level)
} finally {
BlockManager.dispose(bytesToReplicate)
bytesToReplicate.dispose()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
Expand All @@ -944,29 +941,27 @@ private[spark] class BlockManager(
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskBytes: ByteBuffer): ByteBuffer = {
diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
require(!level.deserialized)
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
BlockManager.dispose(diskBytes)
diskBytes.dispose()
memoryStore.getBytes(blockId).get
} else {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
// this action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
copyForMemory.put(diskBytes)
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
diskBytes.copy()
})
if (putSucceeded) {
BlockManager.dispose(diskBytes)
diskBytes.dispose()
memoryStore.getBytes(blockId).get
} else {
diskBytes.rewind()
diskBytes
}
}
Expand Down Expand Up @@ -1032,7 +1027,7 @@ private[spark] class BlockManager(
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
Expand Down Expand Up @@ -1085,11 +1080,15 @@ private[spark] class BlockManager(
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
peer.host,
peer.port,
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
tLevel)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
Expand All @@ -1112,7 +1111,7 @@ private[spark] class BlockManager(
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
Expand Down Expand Up @@ -1154,7 +1153,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
Expand Down Expand Up @@ -1281,11 +1280,11 @@ private[spark] class BlockManager(
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

/** Serializes into a byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
/** Serializes into a chunked byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
}

/**
Expand All @@ -1297,6 +1296,14 @@ private[spark] class BlockManager(
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
}

/**
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
}

/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
Expand Down Expand Up @@ -1325,24 +1332,9 @@ private[spark] class BlockManager(
}


private[spark] object BlockManager extends Logging {
private[spark] object BlockManager {
private val ID_GENERATOR = new IdGenerator

/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Unmapping $buffer")
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
}
}

def blockIdsToHosts(
blockIds: Array[BlockId],
env: SparkEnv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.storage

import java.nio.ByteBuffer

import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.util.io.ChunkedByteBuffer

/**
* This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
* This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
* so that the corresponding block's read lock can be released once this buffer's references
* are released.
*
Expand All @@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
private[storage] class BlockManagerManagedBuffer(
blockManager: BlockManager,
blockId: BlockId,
buf: ByteBuffer) extends NioManagedBuffer(buf) {
chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {

override def retain(): ManagedBuffer = {
super.retain()
Expand Down
Loading

0 comments on commit 6c2d894

Please sign in to comment.