Skip to content

Commit

Permalink
[SPARK-2412] CoalescedRDD throws exception with certain pref locs
Browse files Browse the repository at this point in the history
If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid.

The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist.

Author: Aaron Davidson <[email protected]>

Closes apache#1337 from aarondav/2412 and squashes the following commits:

f587b5d [Aaron Davidson] getOrElseUpdate
3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs
  • Loading branch information
aarondav authored and pwendell committed Jul 17, 2014
1 parent 9c24974 commit 7c23c0d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
numCreated += 1
}
}
Expand All @@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
groupHash.get(nxt_replica).get += pgroup
groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
var tries = 0
while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
nxt_part = rotIt.next()._2
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}

// Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
test("coalesced RDDs with locality, fail first pass") {
val initialPartitions = 1000
val targetLen = 50
val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492

val blocks = (1 to initialPartitions).map { i =>
(i, List(if (i > couponCount) "m2" else "m1"))
}
val data = sc.makeRDD(blocks)
val coalesced = data.coalesce(targetLen)
assert(coalesced.partitions.length == targetLen)
}

test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = nums.zip(nums.map(_ + 1.0))
Expand Down

0 comments on commit 7c23c0d

Please sign in to comment.