Skip to content

Commit

Permalink
[SPARK-6761][SQL][ML] Fixes to API and documentation of approximate q…
Browse files Browse the repository at this point in the history
…uantiles

## What changes were proposed in this pull request?

This continues  thunterdb 's work on `approxQuantile` API. It changes the signature of `approxQuantile` from `(col: String, quantile: Double, epsilon: Double): Double`  to `(col: String, probabilities: Array[Double], relativeError: Double): Array[Double]` and update API doc. It also improves the error message in tests and simplifies the merge algorithm for summaries.

## How was the this patch tested?

Use the same unit tests as before.

Closes apache#11325

Author: Timothy Hunter <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes apache#11332 from mengxr/SPARK-6761.
  • Loading branch information
thunterdb authored and mengxr committed Feb 23, 2016
1 parent 9cdd867 commit 15e3015
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,37 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}
final class DataFrameStatFunctions private[sql](df: DataFrame) {

/**
* Calculate the approximate quantile of numerical column of a DataFrame.
* @param col the name of the column
* @param quantile the quantile number
* @return the approximate quantile
* Calculates the approximate quantiles of a numerical column of a DataFrame.
*
* The result of this algorithm has the following deterministic bound:
* If the DataFrame has N elements and if we request the quantile at probability `p` up to error
* `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` is close to (p * N).
* More precisely,
*
* floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
*
* This method implements a variation of the Greenwald-Khanna algorithm (with some speed
* optimizations).
* The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient
* Online Computation of Quantile Summaries]] by Greenwald and Khanna.
*
* @param col the name of the numerical column
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
* @return the approximate quantiles at the given probabilities
*
* @since 2.0.0
*/
def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = {
StatFunctions.approxQuantile(df, col, quantile, epsilon)
def approxQuantile(
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double] = {
StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, relativeError).head.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.stat

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
Expand All @@ -33,59 +32,37 @@ private[sql] object StatFunctions extends Logging {
import QuantileSummaries.Stats

/**
* Calculates the approximate quantile for the given column.
*
* If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]]
*
* Note on the target error.
* Calculates the approximate quantiles of multiple numerical columns of a DataFrame in one pass.
*
* The result of this algorithm has the following deterministic bound:
* if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`,
* then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` close to (phi * N). More precisely:
*
* floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
*
* Note on the algorithm used.
* If the DataFrame has N elements and if we request the quantile at probability `p` up to error
* `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` is close to (p * N).
* More precisely,
*
* This method implements a variation of the Greenwald-Khanna algorithm
* (with some speed optimizations). The algorithm was first present in the following article:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
* floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
*
* The performance optimizations are detailed in the comments of the implementation.
*
* @param df the dataframe to estimate quantiles on
* @param col the name of the column
* @param quantile the target quantile of interest
* @param epsilon the target error. Should be >= 0.
* */
def approxQuantile(
df: DataFrame,
col: String,
quantile: Double,
epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).")
val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon)
res
}

/**
* Runs multiple quantile computations in a single pass, with the same target error.
*
* See [[approxQuantile)]] for more details on the approximation guarantees.
* This method implements a variation of the Greenwald-Khanna algorithm (with some speed
* optimizations).
* The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient
* Online Computation of Quantile Summaries]] by Greenwald and Khanna.
*
* @param df the dataframe
* @param cols columns of the dataframe
* @param quantiles target quantiles to compute
* @param epsilon the precision to achieve
* @param cols numerical columns of the dataframe
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
*
* @return for each column, returns the requested approximations
*/
def multipleApproxQuantiles(
df: DataFrame,
cols: Seq[String],
quantiles: Seq[Double],
epsilon: Double): Seq[Seq[Double]] = {
probabilities: Seq[Double],
relativeError: Double): Seq[Seq[Double]] = {
val columns: Seq[Column] = cols.map { colName =>
val field = df.schema(colName)
require(field.dataType.isInstanceOf[NumericType],
Expand All @@ -94,7 +71,7 @@ private[sql] object StatFunctions extends Logging {
Column(Cast(Column(colName).expr, DoubleType))
}
val emptySummaries = Array.fill(cols.size)(
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError))

// Note that it works more or less by accident as `rdd.aggregate` is not a pure function:
// this function returns the same array as given in the input (because `aggregate` reuses
Expand All @@ -115,40 +92,49 @@ private[sql] object StatFunctions extends Logging {
}
val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge)

summaries.map { summary => quantiles.map(summary.query) }
summaries.map { summary => probabilities.map(summary.query) }
}

/**
* Helper class to compute approximate quantile summary.
* This implementation is based on the algorithm proposed in the paper:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
* and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
*
* In order to optimize for speed, it maintains an internal buffer of the last seen samples,
* and only inserts them after crossing a certain size threshold. This guarantees a near-constant
* runtime complexity compared to the original algorithm.
*
* @param compressThreshold the compression threshold: after the internal buffer of statistics
* crosses this size, it attempts to compress the statistics together
* @param epsilon the target precision
* @param sampled a buffer of quantile statistics. See the G-K article for more details
* @param compressThreshold the compression threshold.
* After the internal buffer of statistics crosses this size, it attempts to compress the
* statistics together.
* @param relativeError the target relative error.
* It is uniform across the complete range of values.
* @param sampled a buffer of quantile statistics.
* See the G-K article for more details.
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
* @param headSampled a buffer of latest samples seen so far
*/
class QuantileSummaries(
val compressThreshold: Int,
val epsilon: Double,
val relativeError: Double,
val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
private[stat] var count: Long = 0L,
val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable {

import QuantileSummaries._

/**
* Returns a summary with the given observation inserted into the summary.
* This method may either modify in place the current summary (and return the same summary,
* modified in place), or it may create a new summary from scratch it necessary.
* @param x the new observation to insert into the summary
*/
def insert(x: Double): QuantileSummaries = {
headSampled.append(x)
if (headSampled.size >= defaultHeadSize) {
this.withHeadInserted
this.withHeadBufferInserted
} else {
this
}
Expand All @@ -162,7 +148,7 @@ private[sql] object StatFunctions extends Logging {
*
* @return a new quantile summary object.
*/
private def withHeadInserted: QuantileSummaries = {
private def withHeadBufferInserted: QuantileSummaries = {
if (headSampled.isEmpty) {
return this
}
Expand All @@ -187,7 +173,7 @@ private[sql] object StatFunctions extends Logging {
if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
0
} else {
math.floor(2 * epsilon * currentCount).toInt
math.floor(2 * relativeError * currentCount).toInt
}

val tuple = Stats(currentSample, 1, delta)
Expand All @@ -200,67 +186,80 @@ private[sql] object StatFunctions extends Logging {
newSamples.append(sampled(sampleIdx))
sampleIdx += 1
}
new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount)
new QuantileSummaries(compressThreshold, relativeError, newSamples, currentCount)
}

/**
* Returns a new summary that compresses the summary statistics and the head buffer.
*
* This implements the COMPRESS function of the GK algorithm. It does not modify the object.
*
* @return a new summary object with compressed statistics
*/
def compress(): QuantileSummaries = {
// Inserts all the elements first
val inserted = this.withHeadInserted
val inserted = this.withHeadBufferInserted
assert(inserted.headSampled.isEmpty)
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count)
new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count)
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
}

private def shallowCopy: QuantileSummaries = {
new QuantileSummaries(compressThreshold, relativeError, sampled, count, headSampled)
}

/**
* Merges two (compressed) summaries together.
*
* Returns a new summary.
*/
def merge(other: QuantileSummaries): QuantileSummaries = {
require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
if (other.count == 0) {
this
this.shallowCopy
} else if (count == 0) {
other
other.shallowCopy
} else {
// We rely on the fact that they are ordered to efficiently interleave them.
val thisSampled = sampled.toList
val otherSampled = other.sampled.toList
val res: ArrayBuffer[Stats] = ArrayBuffer.empty

@tailrec
def mergeCurrent(
thisList: List[Stats],
otherList: List[Stats]): Unit = (thisList, otherList) match {
case (Nil, l) =>
res.appendAll(l)
case (l, Nil) =>
res.appendAll(l)
case (h1 :: t1, h2 :: t2) if h1.value > h2.value =>
mergeCurrent(otherList, thisList)
case (h1 :: t1, l) =>
// We know that h1.value <= all values in l
// TODO(thunterdb) do we need to adjust g and delta?
res.append(h1)
mergeCurrent(t1, l)
}

mergeCurrent(thisSampled, otherSampled)
val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count)
new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count)
// Merge the two buffers.
// The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
// statistics during the merging: the invariants are still respected after the merge.
// TODO: could replace full sort by ordered merge, the two lists are known to be sorted
// already.
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count)
}
}

/**
* Runs a query for a given quantile.
* The result follows the approximation guarantees detailed above.
* The query can only be run on a compressed summary: you need to call compress() before using
* it.
*
* @param quantile the target quantile
* @return
*/
def query(quantile: Double): Double = {
require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
require(headSampled.isEmpty,
"Cannot operate on an uncompressed summary, call compress() first")

if (quantile <= epsilon) {
if (quantile <= relativeError) {
return sampled.head.value
}

if (quantile >= 1 - epsilon) {
if (quantile >= 1 - relativeError) {
return sampled.last.value
}

// Target rank
val rank = math.ceil(quantile * count).toInt
val targetError = math.ceil(epsilon * count)
val targetError = math.ceil(relativeError * count)
// Minimum rank at current sample
var minRank = 0
var i = 1
Expand Down Expand Up @@ -291,9 +290,10 @@ private[sql] object StatFunctions extends Logging {
val defaultHeadSize: Int = 50000

/**
* The default value for epsilon.
* The default value for the relative error (1%).
* With this value, the best extreme percentiles that can be approximated are 1% and 99%.
*/
val defaultEpsilon: Double = 0.01
val defaultRelativeError: Double = 0.01

/**
* Statisttics from the Greenwald-Khanna paper.
Expand Down
Loading

0 comments on commit 15e3015

Please sign in to comment.