Skip to content

Commit

Permalink
minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
falaki committed Dec 31, 2013
1 parent d6cded7 commit acb0323
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
case (k, v) => (k, v.value.cardinality())
}
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())

}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag](
}
def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2)

mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters)
.value.cardinality()
}

/**
Expand Down

0 comments on commit acb0323

Please sign in to comment.