Skip to content

Commit

Permalink
Renamed countDistinct and countDistinctByKey methods to include Approx
Browse files Browse the repository at this point in the history
  • Loading branch information
falaki committed Dec 31, 2013
1 parent d50ccc5 commit a7de8e9
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*/
def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => {
val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
hll.value.offer(v)
Expand All @@ -242,8 +242,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* output RDD into numPartitions.
*
*/
def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}

/**
Expand All @@ -254,8 +254,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
*/
def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countDistinctByKey(relativeSD, defaultPartitioner(self))
def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ abstract class RDD[T: ClassTag](
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*/
def countDistinct(relativeSD: Double = 0.05): Long = {
def countApproxDistinct(relativeSD: Double = 0.05): Long = {

def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
}

test("countDistinctByKey") {
test("countApproxDistinctByKey") {
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble

/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
Expand All @@ -124,7 +124,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
// Therefore, the expected count for key i would be i.
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
counted1.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
Expand All @@ -137,7 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
(1 to num).map(j => (num, j))
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect()
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
counted2.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}

test("countDistinct") {
test("countApproxDistinct") {

def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble

val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
assert(error(simpleRdd.countDistinct(0.2), size) < 0.2)
assert(error(simpleRdd.countDistinct(0.05), size) < 0.05)
assert(error(simpleRdd.countDistinct(0.01), size) < 0.01)
assert(error(simpleRdd.countDistinct(0.001), size) < 0.001)
assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
}

test("SparkContext.union") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}

test("kryo with SerializableHyperLogLog") {
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3)
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
}

test("kryo with reduce") {
Expand Down

0 comments on commit a7de8e9

Please sign in to comment.