Skip to content

Commit

Permalink
SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails for (empty) RDD of…
Browse files Browse the repository at this point in the history
… Nothing

Follow-on to apache#4591

Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.

CC rxin since you reviewed the last one although I imagine this is an uncontroversial resolution.

Author: Sean Owen <[email protected]>

Closes apache#4698 from srowen/SPARK-5744.2 and squashes the following commits:

9b2a811 [Sean Owen] 2 extra javadoc fixes
d1b9fba [Sean Owen] Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.
  • Loading branch information
srowen committed Feb 20, 2015
1 parent 70bfb5c commit d3dfebe
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 1 deletion.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
} else {
basicBucketFunction _
}
self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
if (self.partitions.length == 0) {
new Array[Long](buckets.length - 1)
} else {
// reduce() requires a non-empty RDD. This works because the mapPartitions will make
// non-empty partitions out of empty ones. But it doesn't handle the no-partitions case,
// which is below
self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
}
}

}
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,9 @@ abstract class RDD[T: ClassTag](
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
Expand Down Expand Up @@ -1258,6 +1261,10 @@ abstract class RDD[T: ClassTag](
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)

/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ public void javaDoubleRDDHistoGram() {
// Test with provided buckets
long[] histogram = rdd.histogram(expected_buckets);
Assert.assertArrayEquals(expected_counts, histogram);
// SPARK-5744
Assert.assertArrayEquals(
new long[] {0},
sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
}

@Test
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
val expectedHistogramResults = Array(0)
assert(histogramResults === expectedHistogramResults)
assert(histogramResults2 === expectedHistogramResults)
val emptyRDD: RDD[Double] = sc.emptyRDD
assert(emptyRDD.histogram(buckets) === expectedHistogramResults)
assert(emptyRDD.histogram(buckets, true) === expectedHistogramResults)
}

test("WorksWithOutOfRangeWithOneBucket") {
Expand Down

0 comments on commit d3dfebe

Please sign in to comment.