Repository: spark Updated Branches: refs/heads/master 01e10c9fe -> 4fd199369
[SPARK-6761][SQL] Approximate quantile for DataFrame JIRA: https://issues.apache.org/jira/browse/SPARK-6761 Compute approximate quantile based on the paper Greenwald, Michael and Khanna, Sanjeev, "Space-efficient Online Computation of Quantile Summaries," SIGMOD '01. Author: Timothy Hunter <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes #6042 from viirya/approximate_quantile. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fd19936 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fd19936 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fd19936 Branch: refs/heads/master Commit: 4fd1993692d45a0da0289b8c7669cc1dc3fe0f2b Parents: 01e10c9 Author: Timothy Hunter <[email protected]> Authored: Mon Feb 22 23:31:00 2016 -0800 Committer: Xiangrui Meng <[email protected]> Committed: Mon Feb 22 23:31:00 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/DataFrameStatFunctions.scala | 10 + .../sql/execution/stat/StatFunctions.scala | 309 +++++++++++++++++++ .../apache/spark/sql/DataFrameStatSuite.scala | 60 ++++ .../execution/stat/ApproxQuantileSuite.scala | 129 ++++++++ 4 files changed, 508 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index bb3cc02..7f110c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -37,6 +37,16 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch} final class DataFrameStatFunctions private[sql](df: DataFrame) { /** + * Calculate the approximate quantile of numerical column of a DataFrame. + * @param col the name of the column + * @param quantile the quantile number + * @return the approximate quantile + */ + def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = { + StatFunctions.approxQuantile(df, col, quantile, epsilon) + } + + /** * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column * @param col2 the name of the second column http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7d70194..eb056d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.stat +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.Logging import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow} @@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String private[sql] object StatFunctions extends Logging { + import QuantileSummaries.Stats + + /** + * Calculates the approximate quantile for the given column. + * + * If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]] + * + * Note on the target error. + * + * The result of this algorithm has the following deterministic bound: + * if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`, + * then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank + * of `x` close to (phi * N). More precisely: + * + * floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N) + * + * Note on the algorithm used. + * + * This method implements a variation of the Greenwald-Khanna algorithm + * (with some speed optimizations). The algorithm was first present in the following article: + * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael + * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) + * + * The performance optimizations are detailed in the comments of the implementation. + * + * @param df the dataframe to estimate quantiles on + * @param col the name of the column + * @param quantile the target quantile of interest + * @param epsilon the target error. Should be >= 0. + * */ + def approxQuantile( + df: DataFrame, + col: String, + quantile: Double, + epsilon: Double = QuantileSummaries.defaultEpsilon): Double = { + require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).") + val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon) + res + } + + /** + * Runs multiple quantile computations in a single pass, with the same target error. + * + * See [[approxQuantile)]] for more details on the approximation guarantees. + * + * @param df the dataframe + * @param cols columns of the dataframe + * @param quantiles target quantiles to compute + * @param epsilon the precision to achieve + * @return for each column, returns the requested approximations + */ + def multipleApproxQuantiles( + df: DataFrame, + cols: Seq[String], + quantiles: Seq[Double], + epsilon: Double): Seq[Seq[Double]] = { + val columns: Seq[Column] = cols.map { colName => + val field = df.schema(colName) + require(field.dataType.isInstanceOf[NumericType], + s"Quantile calculation for column $colName with data type ${field.dataType}" + + " is not supported.") + Column(Cast(Column(colName).expr, DoubleType)) + } + val emptySummaries = Array.fill(cols.size)( + new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon)) + + // Note that it works more or less by accident as `rdd.aggregate` is not a pure function: + // this function returns the same array as given in the input (because `aggregate` reuses + // the same argument). + def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { + var i = 0 + while (i < summaries.length) { + summaries(i) = summaries(i).insert(row.getDouble(i)) + i += 1 + } + summaries + } + + def merge( + sum1: Array[QuantileSummaries], + sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { + sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } + } + val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge) + + summaries.map { summary => quantiles.map(summary.query) } + } + + /** + * Helper class to compute approximate quantile summary. + * This implementation is based on the algorithm proposed in the paper: + * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael + * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670) + * + * In order to optimize for speed, it maintains an internal buffer of the last seen samples, + * and only inserts them after crossing a certain size threshold. This guarantees a near-constant + * runtime complexity compared to the original algorithm. + * + * @param compressThreshold the compression threshold: after the internal buffer of statistics + * crosses this size, it attempts to compress the statistics together + * @param epsilon the target precision + * @param sampled a buffer of quantile statistics. See the G-K article for more details + * @param count the count of all the elements *inserted in the sampled buffer* + * (excluding the head buffer) + * @param headSampled a buffer of latest samples seen so far + */ + class QuantileSummaries( + val compressThreshold: Int, + val epsilon: Double, + val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty, + private[stat] var count: Long = 0L, + val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable { + + import QuantileSummaries._ + + def insert(x: Double): QuantileSummaries = { + headSampled.append(x) + if (headSampled.size >= defaultHeadSize) { + this.withHeadInserted + } else { + this + } + } + + /** + * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse + * the summary statistics in a single batch. + * + * This method does not modify the current object and returns if necessary a new copy. + * + * @return a new quantile summary object. + */ + private def withHeadInserted: QuantileSummaries = { + if (headSampled.isEmpty) { + return this + } + var currentCount = count + val sorted = headSampled.toArray.sorted + val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() + // The index of the next element to insert + var sampleIdx = 0 + // The index of the sample currently being inserted. + var opsIdx: Int = 0 + while(opsIdx < sorted.length) { + val currentSample = sorted(opsIdx) + // Add all the samples before the next observation. + while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + + // If it is the first one to insert, of if it is the last one + currentCount += 1 + val delta = + if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { + 0 + } else { + math.floor(2 * epsilon * currentCount).toInt + } + + val tuple = Stats(currentSample, 1, delta) + newSamples.append(tuple) + opsIdx += 1 + } + + // Add all the remaining existing samples + while(sampleIdx < sampled.size) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount) + } + + def compress(): QuantileSummaries = { + // Inserts all the elements first + val inserted = this.withHeadInserted + assert(inserted.headSampled.isEmpty) + assert(inserted.count == count + headSampled.size) + val compressed = + compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count) + new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count) + } + + def merge(other: QuantileSummaries): QuantileSummaries = { + if (other.count == 0) { + this + } else if (count == 0) { + other + } else { + // We rely on the fact that they are ordered to efficiently interleave them. + val thisSampled = sampled.toList + val otherSampled = other.sampled.toList + val res: ArrayBuffer[Stats] = ArrayBuffer.empty + + @tailrec + def mergeCurrent( + thisList: List[Stats], + otherList: List[Stats]): Unit = (thisList, otherList) match { + case (Nil, l) => + res.appendAll(l) + case (l, Nil) => + res.appendAll(l) + case (h1 :: t1, h2 :: t2) if h1.value > h2.value => + mergeCurrent(otherList, thisList) + case (h1 :: t1, l) => + // We know that h1.value <= all values in l + // TODO(thunterdb) do we need to adjust g and delta? + res.append(h1) + mergeCurrent(t1, l) + } + + mergeCurrent(thisSampled, otherSampled) + val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count) + new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count) + } + } + + def query(quantile: Double): Double = { + require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]") + + if (quantile <= epsilon) { + return sampled.head.value + } + + if (quantile >= 1 - epsilon) { + return sampled.last.value + } + + // Target rank + val rank = math.ceil(quantile * count).toInt + val targetError = math.ceil(epsilon * count) + // Minimum rank at current sample + var minRank = 0 + var i = 1 + while (i < sampled.size - 1) { + val curSample = sampled(i) + minRank += curSample.g + val maxRank = minRank + curSample.delta + if (maxRank - targetError <= rank && rank <= minRank + targetError) { + return curSample.value + } + i += 1 + } + sampled.last.value + } + } + + object QuantileSummaries { + // TODO(tjhunter) more tuning could be done one the constants here, but for now + // the main cost of the algorithm is accessing the data in SQL. + /** + * The default value for the compression threshold. + */ + val defaultCompressThreshold: Int = 10000 + + /** + * The size of the head buffer. + */ + val defaultHeadSize: Int = 50000 + + /** + * The default value for epsilon. + */ + val defaultEpsilon: Double = 0.01 + + /** + * Statisttics from the Greenwald-Khanna paper. + * @param value the sampled value + * @param g the minimum rank jump from the previous value's minimum rank + * @param delta the maximum span of the rank. + */ + case class Stats(value: Double, g: Int, delta: Int) + + private def compressImmut( + currentSamples: IndexedSeq[Stats], + mergeThreshold: Double): ArrayBuffer[Stats] = { + val res: ArrayBuffer[Stats] = ArrayBuffer.empty + if (currentSamples.isEmpty) { + return res + } + // Start for the last element, which is always part of the set. + // The head contains the current new head, that may be merged with the current element. + var head = currentSamples.last + var i = currentSamples.size - 2 + // Do not compress the last element + while (i >= 1) { + // The current sample: + val sample1 = currentSamples(i) + // Do we need to compress? + if (sample1.g + head.g + head.delta < mergeThreshold) { + // Do not insert yet, just merge the current element into the head. + head = head.copy(g = head.g + sample1.g) + } else { + // Prepend the current head, and keep the current sample as target for merging. + res.prepend(head) + head = sample1 + } + i -= 1 + } + res.prepend(head) + // If necessary, add the minimum element: + res.prepend(currentSamples.head) + res + } + } + /** Calculate the Pearson Correlation Coefficient for the given columns */ private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols, "correlation") http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index f01f126..7f92292 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -21,6 +21,8 @@ import java.util.Random import org.scalatest.Matchers._ +import org.apache.spark.Logging +import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.DoubleType @@ -123,6 +125,26 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(math.abs(decimalRes) < 1e-12) } + test("approximate quantile") { + val df = Seq.tabulate(1000)(i => (i, 2.0 * i)).toDF("singles", "doubles") + + val expected_1 = 500.0 + val expected_2 = 1600.0 + + val epsilons = List(0.1, 0.05, 0.001) + + for (epsilon <- epsilons) { + val result1 = df.stat.approxQuantile("singles", 0.5, epsilon) + val result2 = df.stat.approxQuantile("doubles", 0.8, epsilon) + + val error_1 = 2 * 1000 * epsilon + val error_2 = 2 * 2000 * epsilon + + assert(math.abs(result1 - expected_1) < error_1) + assert(math.abs(result2 - expected_2) < error_2) + } + } + test("crosstab") { val rng = new Random() val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10))) @@ -269,3 +291,41 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(0.until(1000).forall(i => filter4.mightContain(i * 3))) } } + + +class DataFrameStatPerfSuite extends QueryTest with SharedSQLContext with Logging { + + // Turn on this test if you want to test the performance of approximate quantiles. + ignore("describe() should not be slowed down too much by quantiles") { + val df = sqlContext.range(5000000L).toDF("col1").cache() + def millis(f: => Any): Double = { + // Do some warmup + logDebug("warmup...") + for (i <- 1 to 10) { + df.count() + f + } + logDebug("execute...") + // Do it 10 times and report median + val times = (1 to 10).map { i => + val start = System.nanoTime() + f + val end = System.nanoTime() + (end - start) / 1e9 + } + logDebug("execute done") + times.sum.toDouble / times.length.toDouble + + } + + logDebug("*** Normal describe ***") + val t1 = millis { df.describe() } + logDebug(s"T1 = $t1") + logDebug("*** Just quantiles ***") + val t2 = millis { + StatFunctions.multipleApproxQuantiles(df, Seq("col1"), Seq(0.1, 0.25, 0.5, 0.75, 0.9), 0.01) + } + logDebug(s"T1 = $t1, T2 = $t2") + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala new file mode 100644 index 0000000..6992b4c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.stat + +import scala.util.Random + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries + + +class ApproxQuantileSuite extends SparkFunSuite { + + private val r = new Random(1) + private val n = 100 + private val increasing = "increasing" -> (0 until n).map(_.toDouble) + private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble) + private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000)) + + private def buildSummary( + data: Seq[Double], + epsi: Double, + threshold: Int): QuantileSummaries = { + var summary = new QuantileSummaries(threshold, epsi) + data.foreach { x => + summary = summary.insert(x) + } + summary.compress() + } + + private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { + val approx = summary.query(quant) + // The rank of the approximation. + val rank = data.count(_ < approx) // has to be <, not <= to be exact + val lower = math.floor((quant - summary.epsilon) * data.size) + assert(rank >= lower, + s"approx_rank: $rank ! >= $lower, requested quantile = $quant") + val upper = math.ceil((quant + summary.epsilon) * data.size) + assert(rank <= upper, + s"approx_rank: $rank ! <= $upper, requested quantile = $quant") + } + + for { + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) + } { + + test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + } + + test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) + assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + } + + // Tests for merging procedure + for { + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) + } { + + val (data1, data2) = { + val l = data.size + data.take(l / 2) -> data.drop(l / 2) + } + + test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data1, epsi, compression) + val s2 = buildSummary(data2, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + + val (data11, data12) = { + data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq + } + + test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data11, epsi, compression) + val s2 = buildSummary(data12, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
