Skip to content

Commit

Permalink
[Minor] Fix style, formatting and naming in BlockManager etc.
Browse files Browse the repository at this point in the history
This is a precursor to a bigger change. I wanted to separate out the relatively insignificant changes so the ultimate PR is not inflated.

(Warning: this PR is full of unimportant nitpicks)

Author: Andrew Or <[email protected]>

Closes apache#1058 from andrewor14/bm-minor and squashes the following commits:

8e12eaf [Andrew Or] SparkException -> BlockException
c36fd53 [Andrew Or] Make parts of BlockManager more readable
0a5f378 [Andrew Or] Entry -> MemoryEntry
e9762a5 [Andrew Or] Tone down string interpolation (minor reverts)
c4de9ac [Andrew Or] Merge branch 'master' of github.com:apache/spark into bm-minor
b3470f1 [Andrew Or] More string interpolation (minor)
7f9dcab [Andrew Or] Use string interpolation (minor)
94a425b [Andrew Or] Refactor against duplicate code + minor changes
8a6a7dc [Andrew Or] Exception -> SparkException
97c410f [Andrew Or] Deal with MIMA excludes
2480f1d [Andrew Or] Fixes in StorgeLevel.scala
abb0163 [Andrew Or] Style, formatting and naming fixes
  • Loading branch information
andrewor14 authored and pwendell committed Jun 13, 2014
1 parent 1de1d70 commit 44daec5
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 370 deletions.
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
def getOrCompute[T](
rdd: RDD[T],
split: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
Expand All @@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
logInfo(s"Another thread is loading $key, waiting for it to finish...")
while (loading.contains(key)) {
try {
loading.wait()
Expand All @@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
logInfo("Finished waiting for %s".format(key))
logInfo(s"Finished waiting for $key")
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
Expand All @@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
loading.add(key)
}
} else {
Expand All @@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// Persist the result, so long as the task is not running locally
Expand All @@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
logInfo(s"Failure to store $key")
throw new SparkException("Block manager failed to return persisted value")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
setInitThread()

private def setInitThread() {
// Set current thread as init thread - waitForReady will not block this thread
// (in case there is non trivial initialization which ends up calling waitForReady as part of
// initialization itself)
/* Set current thread as init thread - waitForReady will not block this thread
* (in case there is non trivial initialization which ends up calling waitForReady
* as part of initialization itself) */
BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
}

Expand All @@ -42,16 +42,18 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
def waitForReady(): Boolean = {
if (pending && initThread != Thread.currentThread()) {
synchronized {
while (pending) this.wait()
while (pending) {
this.wait()
}
}
}
!failed
}

/** Mark this BlockInfo as ready (i.e. block is finished writing) */
def markReady(sizeInBytes: Long) {
require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
assert (pending)
require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
assert(pending)
size = sizeInBytes
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
Expand All @@ -61,7 +63,7 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea

/** Mark this BlockInfo as ready but failed */
def markFailure() {
assert (pending)
assert(pending)
size = BlockInfo.BLOCK_FAILED
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
Expand All @@ -71,9 +73,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
}

private object BlockInfo {
// initThread is logically a BlockInfo field, but we store it here because
// it's only needed while this block is in the 'pending' state and we want
// to minimize BlockInfo's memory footprint.
/* initThread is logically a BlockInfo field, but we store it here because
* it's only needed while this block is in the 'pending' state and we want
* to minimize BlockInfo's memory footprint. */
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]

private val BLOCK_PENDING: Long = -1L
Expand Down
Loading

0 comments on commit 44daec5

Please sign in to comment.