Repository: spark Updated Branches: refs/heads/branch-1.1 e0bc333b6 -> 12f16ba3f
[SPARK-2862] histogram method fails on some choices of bucketCount Author: Chandan Kumar <[email protected]> Closes #1787 from nrchandan/spark-2862 and squashes the following commits: a76bbf6 [Chandan Kumar] [SPARK-2862] Fix for a broken test case and add new test cases 4211eea [Chandan Kumar] [SPARK-2862] Add Scala bug id 13854f1 [Chandan Kumar] [SPARK-2862] Use shorthand range notation to avoid Scala bug (cherry picked from commit f45efbb8aaa65bc46d65e77e93076fbc29f4455d) Signed-off-by: Xiangrui Meng <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12f16ba3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12f16ba3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12f16ba3 Branch: refs/heads/branch-1.1 Commit: 12f16ba3fa1f3cde9f43c094029017f4192b1bac Parents: e0bc333 Author: Chandan Kumar <[email protected]> Authored: Mon Aug 18 09:52:25 2014 -0700 Committer: Xiangrui Meng <[email protected]> Committed: Mon Aug 18 09:52:33 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/rdd/DoubleRDDFunctions.scala | 15 +++++++++---- .../org/apache/spark/rdd/DoubleRDDSuite.scala | 23 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/12f16ba3/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index f233544..e0494ee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * If the elements in RDD do not vary (max == min) always returns a single bucket. */ def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { - // Compute the minimum and the maxium + // Scala's built-in range has issues. See #SI-8782 + def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = { + val span = max - min + Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max + } + // Compute the minimum and the maximum val (max: Double, min: Double) = self.mapPartitions { items => Iterator(items.foldRight(Double.NegativeInfinity, Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) => @@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { throw new UnsupportedOperationException( "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") } - val increment = (max-min)/bucketCount.toDouble - val range = if (increment != 0) { - Range.Double.inclusive(min, max, increment) + val range = if (min != max) { + // Range.Double.inclusive(min, max, increment) + // The above code doesn't always work. See Scala bug #SI-8782. + // https://issues.scala-lang.org/browse/SI-8782 + customRange(min, max, bucketCount) } else { List(min, min) } http://git-wip-us.apache.org/repos/asf/spark/blob/12f16ba3/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala index a822bd1..f89bdb6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext { assert(histogramBuckets === expectedHistogramBuckets) } + test("WorksWithoutBucketsForLargerDatasets") { + // Verify the case of slighly larger datasets + val rdd = sc.parallelize(6 to 99) + val (histogramBuckets, histogramResults) = rdd.histogram(8) + val expectedHistogramResults = + Array(12, 12, 11, 12, 12, 11, 12, 12) + val expectedHistogramBuckets = + Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + + test("WorksWithoutBucketsWithIrrationalBucketEdges") { + // Verify the case of buckets with irrational edges. See #SPARK-2862. + val rdd = sc.parallelize(6 to 99) + val (histogramBuckets, histogramResults) = rdd.histogram(9) + val expectedHistogramResults = + Array(11, 10, 11, 10, 10, 11, 10, 10, 11) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets(0) === 6.0) + assert(histogramBuckets(9) === 99.0) + } + // Test the failure mode with an invalid RDD test("ThrowsExceptionOnInvalidRDDs") { // infinity --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
