Skip to content

Commit

Permalink
[SPARK-16166][CORE] Also take off-heap memory usage into consideratio…
Browse files Browse the repository at this point in the history
…n in log and webui display

## What changes were proposed in this pull request?

Currently in the log and UI display, only on-heap storage memory is calculated and displayed,

```
16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory (estimated size 17.8 KB, free 665.9 MB)
```
<img width="1232" alt="untitled" src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png">

With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap memory is supported for data persistence, so here change to also take off-heap storage memory into consideration.

## How was this patch tested?

Unit test and local verification.

Author: jerryshao <[email protected]>

Closes apache#13920 from jerryshao/SPARK-16166.
  • Loading branch information
jerryshao authored and JoshRosen committed Jul 25, 2016
1 parent cda4603 commit f5ea7fe
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 9 deletions.
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ private[spark] abstract class MemoryManager(
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* Total available on heap memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
def maxOnHeapStorageMemory: Long

/**
* Total available off heap memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
*/
def maxOffHeapStorageMemory: Long

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager(
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def maxOffHeapStorageMemory: Long = 0L

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxOnHeapStorageMemory
private val maxMemory =
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
Expand Down Expand Up @@ -802,7 +803,7 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ private[spark] class MemoryStore(
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

/** Total amount of memory available for storage, in bytes. */
private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
private def maxMemory: Long = {
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
}

if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TestMemoryManager(conf: SparkConf)
}
override def maxOnHeapStorageMemory: Long = Long.MaxValue

override def maxOffHeapStorageMemory: Long = 0L

private var oomOnce = false
private var available = Long.MaxValue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000")
assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
Expand Down Expand Up @@ -269,8 +269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
memStatus._1 should equal (20000L)
memStatus._2 should equal (20000L)
memStatus._1 should equal (40000L)
memStatus._2 should equal (40000L)
}
}

Expand Down

0 comments on commit f5ea7fe

Please sign in to comment.