Skip to content

Commit

Permalink
[SPARK-27468][CORE] Track correct storage level of RDDs and partitions
Browse files Browse the repository at this point in the history
Previously, the RDD level would change depending on the status reported
by executors for the block they were storing, and individual blocks would
reflect that. That is wrong because different blocks may be stored differently
in different executors.

So now the RDD tracks the user-provided storage level, while the individual
partitions reflect the current storage level of that particular block,
including the current number of replicas.

Closes apache#25779 from vanzin/SPARK-27468.

Authored-by: Marcelo Vanzin <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
  • Loading branch information
Marcelo Vanzin authored and squito committed Oct 7, 2019
1 parent 64fe82b commit d2f21b0
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ private[spark] class AppStatusListener(
(partition.memoryUsed / partition.executors.length) * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
(partition.diskUsed / partition.executors.length) * -1)
partition.update(partition.executors
.filter(!_.equals(event.executorId)), rdd.storageLevel,
partition.update(
partition.executors.filter(!_.equals(event.executorId)),
addDeltaToValue(partition.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1),
addDeltaToValue(partition.diskUsed,
Expand Down Expand Up @@ -495,7 +495,7 @@ private[spark] class AppStatusListener(

event.stageInfo.rddInfos.foreach { info =>
if (info.storageLevel.isValid) {
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info, info.storageLevel)), now)
}
}

Expand Down Expand Up @@ -916,12 +916,6 @@ private[spark] class AppStatusListener(
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)

val updatedStorageLevel = if (storageLevel.isValid) {
Some(storageLevel.description)
} else {
None
}

// We need information about the executor to update some memory accounting values in the
// RDD info, so read that beforehand.
val maybeExec = liveExecutors.get(executorId)
Expand All @@ -936,13 +930,9 @@ private[spark] class AppStatusListener(
// Update the block entry in the RDD info, keeping track of the deltas above so that we
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>
if (updatedStorageLevel.isDefined) {
rdd.setStorageLevel(updatedStorageLevel.get)
}

val partition = rdd.partition(block.name)

val executors = if (updatedStorageLevel.isDefined) {
val executors = if (storageLevel.isValid) {
val current = partition.executors
if (current.contains(executorId)) {
current
Expand All @@ -957,7 +947,7 @@ private[spark] class AppStatusListener(

// Only update the partition if it's still stored in some executor, otherwise get rid of it.
if (executors.nonEmpty) {
partition.update(executors, rdd.storageLevel,
partition.update(executors,
addDeltaToValue(partition.memoryUsed, memoryDelta),
addDeltaToValue(partition.diskUsed, diskDelta))
} else {
Expand Down
37 changes: 24 additions & 13 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.{RDDInfo, StorageLevel}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
Expand Down Expand Up @@ -458,7 +458,13 @@ private class LiveStage extends LiveEntity {

}

private class LiveRDDPartition(val blockName: String) {
/**
* Data about a single partition of a cached RDD. The RDD storage level is used to compute the
* effective storage level of the partition, which takes into account the storage actually being
* used by the partition in the executors, and thus may differ from the storage level requested
* by the application.
*/
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {

import LiveEntityHelpers._

Expand All @@ -476,12 +482,13 @@ private class LiveRDDPartition(val blockName: String) {

def update(
executors: Seq[String],
storageLevel: String,
memoryUsed: Long,
diskUsed: Long): Unit = {
val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap,
if (memoryUsed > 0) rddLevel.deserialized else false, executors.size)
value = new v1.RDDPartitionInfo(
blockName,
weakIntern(storageLevel),
weakIntern(level.description),
memoryUsed,
diskUsed,
executors)
Expand Down Expand Up @@ -520,27 +527,31 @@ private class LiveRDDDistribution(exec: LiveExecutor) {

}

private class LiveRDD(val info: RDDInfo) extends LiveEntity {
/**
* Tracker for data related to a persisted RDD.
*
* The RDD storage level is immutable, following the current behavior of `RDD.persist()`, even
* though it is mutable in the `RDDInfo` structure. Since the listener does not track unpersisted
* RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage
* it started after the RDD is marked for caching.
*/
private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {

import LiveEntityHelpers._

var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
var diskUsed = 0L

private val levelDescription = weakIntern(storageLevel.description)
private val partitions = new HashMap[String, LiveRDDPartition]()
private val partitionSeq = new RDDPartitionSeq()

private val distributions = new HashMap[String, LiveRDDDistribution]()

def setStorageLevel(level: String): Unit = {
this.storageLevel = weakIntern(level)
}

def partition(blockName: String): LiveRDDPartition = {
partitions.getOrElseUpdate(blockName, {
val part = new LiveRDDPartition(blockName)
part.update(Nil, storageLevel, 0L, 0L)
val part = new LiveRDDPartition(blockName, storageLevel)
part.update(Nil, 0L, 0L)
partitionSeq.addPartition(part)
part
})
Expand Down Expand Up @@ -578,7 +589,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
info.name,
info.numPartitions,
partitions.size,
storageLevel,
levelDescription,
memoryUsed,
diskUsed,
dists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)

private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)

private var time: Long = _
private var testDir: File = _
private var store: ElementTrackingStore = _
Expand Down Expand Up @@ -697,8 +699,16 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val rdd2b1 = RddBlock(2, 1, 5L, 6L)
val level = StorageLevel.MEMORY_AND_DISK

// Submit a stage for the first RDD before it's marked for caching, to make sure later
// the listener picks up the correct storage level.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, StorageLevel.NONE, false, Nil)
val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage0, new Properties()))
listener.onStageCompleted(SparkListenerStageCompleted(stage0))
assert(store.count(classOf[RDDStorageInfoWrapper]) === 0)

// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
rdd1Info.storageLevel = level
val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
Expand Down Expand Up @@ -763,6 +773,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(part.memoryUsed === rdd1b1.memSize * 2)
assert(part.diskUsed === rdd1b1.diskSize * 2)
assert(part.executors === Seq(bm1.executorId, bm2.executorId))
assert(part.storageLevel === twoReplicaMemAndDiskLevel.description)
}

check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
Expand Down Expand Up @@ -800,9 +811,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}

// Remove block 1 from bm 1.
// Evict block 1 from memory in bm 1. Note that because of SPARK-29319, the disk size
// is reported as "0" here to avoid double-counting; the current behavior of the block
// manager is to provide the actual disk size of the block.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.DISK_ONLY,
rdd1b1.memSize, 0L)))

check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
assert(wrapper.info.dataDistribution.get.size === 2L)
assert(wrapper.info.partitions.get.size === 2L)
}

check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 2L)
assert(exec.info.memoryUsed === rdd1b2.memSize)
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}

// Remove block 1 from bm 1; note memSize = 0 due to the eviction above.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize)))
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0, rdd1b1.diskSize)))

check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
Expand Down Expand Up @@ -1571,7 +1603,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)

val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part1.storageLevel === level.description)
assert(part1.storageLevel === twoReplicaMemAndDiskLevel.description)
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.status

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel

class LiveEntitySuite extends SparkFunSuite {

Expand Down Expand Up @@ -59,8 +60,8 @@ class LiveEntitySuite extends SparkFunSuite {
}

private def newPartition(i: Int): LiveRDDPartition = {
val part = new LiveRDDPartition(i.toString)
part.update(Seq(i.toString), i.toString, i, i)
val part = new LiveRDDPartition(i.toString, StorageLevel.MEMORY_AND_DISK)
part.update(Seq(i.toString), i, i)
part
}

Expand Down

0 comments on commit d2f21b0

Please sign in to comment.