Repository: spark
Updated Branches:
  refs/heads/master c6df5f66d -> b963c19a8


[SPARK-10164] [MLLIB] Fixed GMM distributed decomposition bug

GaussianMixture now distributes matrix decompositions for certain problem 
sizes. Distributed computation actually fails, but this was not tested in unit 
tests.

This PR adds a unit test which checks this.  It failed previously but works 
with this fix.

CC: mengxr

Author: Joseph K. Bradley <jos...@databricks.com>

Closes #8370 from jkbradley/gmm-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b963c19a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b963c19a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b963c19a

Branch: refs/heads/master
Commit: b963c19a803c5a26c9b65655d40ca6621acf8bd4
Parents: c6df5f6
Author: Joseph K. Bradley <jos...@databricks.com>
Authored: Sun Aug 23 18:34:07 2015 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Sun Aug 23 18:34:07 2015 -0700

----------------------------------------------------------------------
 .../mllib/clustering/GaussianMixture.scala      | 22 +++++++++++++-------
 .../mllib/clustering/GaussianMixtureSuite.scala | 22 ++++++++++++++++++--
 2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b963c19a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index fcc9dfe..daa947e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -169,9 +169,7 @@ class GaussianMixture private (
     // Get length of the input vectors
     val d = breezeData.first().length
 
-    // Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
-    // d > 25 except for when k is very small
-    val distributeGaussians = ((k - 1.0) / k) * d > 25
+    val shouldDistributeGaussians = 
GaussianMixture.shouldDistributeGaussians(k, d)
 
     // Determine initial weights and corresponding Gaussians.
     // If the user supplied an initial GMM, we use those values, otherwise
@@ -205,15 +203,15 @@ class GaussianMixture private (
       // (often referred to as the "M" step in literature)
       val sumWeights = sums.weights.sum
 
-      if (distributeGaussians) {
+      if (shouldDistributeGaussians) {
         val numPartitions = math.min(k, 1024)
         val tuples =
           Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), 
sums.weights(i)))
         val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, 
sigma, weight) =>
           updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
-        }.collect.unzip
-        Array.copy(ws, 0, weights, 0, ws.length)
-        Array.copy(gs, 0, gaussians, 0, gs.length)
+        }.collect().unzip
+        Array.copy(ws.toArray, 0, weights, 0, ws.length)
+        Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
       } else {
         var i = 0
         while (i < k) {
@@ -271,6 +269,16 @@ class GaussianMixture private (
   }
 }
 
+private[clustering] object GaussianMixture {
+  /**
+   * Heuristic to distribute the computation of the [[MultivariateGaussian]]s, 
approximately when
+   * d > 25 except for when k is very small.
+   * @param k  Number of topics
+   * @param d  Number of features
+   */
+  def shouldDistributeGaussians(k: Int, d: Int): Boolean = ((k - 1.0) / k) * d 
> 25
+}
+
 // companion class to provide zero constructor for ExpectationSum
 private object ExpectationSum {
   def zero(k: Int, d: Int): ExpectationSum = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b963c19a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
index b636d02..a72723e 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.mllib.clustering
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.{Vectors, Matrices}
+import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrices}
 import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
@@ -76,6 +76,20 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     assert(gmm.gaussians(1).sigma ~== Esigma(1) absTol 1E-3)
   }
 
+  test("two clusters with distributed decompositions") {
+    val data = sc.parallelize(GaussianTestData.data2, 2)
+
+    val k = 5
+    val d = data.first().size
+    assert(GaussianMixture.shouldDistributeGaussians(k, d))
+
+    val gmm = new GaussianMixture()
+      .setK(k)
+      .run(data)
+
+    assert(gmm.k === k)
+  }
+
   test("single cluster with sparse data") {
     val data = sc.parallelize(Array(
       Vectors.sparse(3, Array(0, 2), Array(4.0, 2.0)),
@@ -116,7 +130,7 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val sparseGMM = new GaussianMixture()
       .setK(2)
       .setInitialModel(initialGmm)
-      .run(data)
+      .run(sparseData)
 
     assert(sparseGMM.weights(0) ~== Ew(0) absTol 1E-3)
     assert(sparseGMM.weights(1) ~== Ew(1) absTol 1E-3)
@@ -168,5 +182,9 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
     )
 
+    val data2: Array[Vector] = Array.tabulate(25){ i: Int =>
+      Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+    }
+
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to