Skip to content

Commit

Permalink
[SPARK-4452] [CORE] Shuffle data structures can starve others on the …
Browse files Browse the repository at this point in the history
…same thread for memory

## What changes were proposed in this pull request?
In apache#9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from apache#9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

## How was this patch tested?
add two unit tests for it.

Author: Lianhui Wang <[email protected]>

Closes apache#10024 from lianhuiwang/SPARK-4452-2.
  • Loading branch information
lianhuiwang authored and davies committed Apr 21, 2016
1 parent 649335d commit 4f36917
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 46 deletions.
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
/**
* Returns the size of used memory in bytes.
*/
long getUsed() {
protected long getUsed() {
return used;
}

Expand Down Expand Up @@ -130,4 +130,21 @@ protected void freePage(MemoryBlock page) {
used -= page.size();
taskMemoryManager.freePage(page, this);
}

/**
* Allocates a heap memory of `size`.
*/
public long acquireOnHeapMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
used += granted;
return granted;
}

/**
* Release N bytes of heap memory.
*/
public void freeOnHeapMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
used -= size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,11 @@ public long cleanUpAllAllocatedMemory() {
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}

/**
* Returns Tungsten memory mode
*/
public MemoryMode getTungstenMemoryMode(){
return tungstenMemoryMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class ExternalAppendOnlyMap[K, V, C](
blockManager: BlockManager = SparkEnv.get.blockManager,
context: TaskContext = TaskContext.get(),
serializerManager: SerializerManager = SparkEnv.get.serializerManager)
extends Iterable[(K, C)]
extends Spillable[SizeTracker](context.taskMemoryManager())
with Serializable
with Logging
with Spillable[SizeTracker] {
with Iterable[(K, C)] {

if (context == null) {
throw new IllegalStateException(
Expand All @@ -81,9 +81,7 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}

override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
Expand Down Expand Up @@ -117,6 +115,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

@volatile private var readingIterator: SpillableIterator = null

/**
* Number of files this map has spilled so far.
* Exposed for testing.
Expand Down Expand Up @@ -182,6 +182,29 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
spilledMaps.append(diskMapIterator)
}

/**
* Force to spilling the current in-memory collection to disk to release memory,
* It will be called by TaskMemoryManager when there is not enough memory for the task.
*/
override protected[this] def forceSpill(): Boolean = {
assert(readingIterator != null)
val isSpilled = readingIterator.spill()
if (isSpilled) {
currentMap = null
}
isSpilled
}

/**
* Spill the in-memory Iterator to a temporary file on disk.
*/
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
: DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
Expand All @@ -202,9 +225,8 @@ class ExternalAppendOnlyMap[K, V, C](

var success = false
try {
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
val kv = it.next()
while (inMemoryIterator.hasNext) {
val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1

Expand Down Expand Up @@ -237,7 +259,17 @@ class ExternalAppendOnlyMap[K, V, C](
}
}

spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
new DiskMapIterator(file, blockId, batchSizes)
}

/**
* Returns a destructive iterator for iterating over the entries of this map.
* If this iterator is forced spill to disk to release memory when there is not enough memory,
* it returns pairs from an on-disk map.
*/
def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator
}

/**
Expand All @@ -250,15 +282,18 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}

private def freeCurrentMap(): Unit = {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
if (currentMap != null) {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
}
}

/**
Expand All @@ -272,8 +307,8 @@ class ExternalAppendOnlyMap[K, V, C](

// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
Expand Down Expand Up @@ -532,8 +567,56 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}

private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
extends Iterator[(K, C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = SPILL_LOCK.synchronized {
if (hasSpilled) {
false
} else {
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
nextUpstream = spillMemoryIteratorToDisk(upstream)
hasSpilled = true
true
}
}

def readNext(): (K, C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
if (upstream.hasNext) {
upstream.next()
} else {
null
}
}

override def hasNext(): Boolean = cur != null

override def next(): (K, C) = {
val r = cur
cur = readNext()
r
}
}

/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)

override def toString(): String = {
this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
}
}

private[spark] object ExternalAppendOnlyMap {
Expand Down
Loading

0 comments on commit 4f36917

Please sign in to comment.