This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e7fa778  [SPARK-30699][ML][PYSPARK] GMM blockify input vectors
e7fa778 is described below

commit e7fa778dc7a695d3b1426de6f98a401f2fb98f39
Author: zhengruifeng <[email protected]>
AuthorDate: Tue May 12 12:54:03 2020 +0800

    [SPARK-30699][ML][PYSPARK] GMM blockify input vectors
    
    ### What changes were proposed in this pull request?
    1, add new param blockSize;
    2, if blockSize==1, keep original behavior, code path trainOnRows;
    3, if blockSize>1, standardize and stack input vectors to blocks (like 
ALS/MLP), code path trainOnBlocks
    
    ### Why are the changes needed?
    performance gain on dense dataset HIGGS:
    1, save about 45% RAM;
    2, 3X faster with openBLAS
    
    ### Does this PR introduce any user-facing change?
    add a new expert param `blockSize`
    
    ### How was this patch tested?
    added testsuites
    
    Closes #27473 from zhengruifeng/blockify_gmm.
    
    Authored-by: zhengruifeng <[email protected]>
    Signed-off-by: zhengruifeng <[email protected]>
---
 .../scala/org/apache/spark/ml/linalg/BLAS.scala    |   2 +-
 .../stat/distribution/MultivariateGaussian.scala   |  32 +-
 .../distribution/MultivariateGaussianSuite.scala   |  10 +
 .../spark/ml/clustering/GaussianMixture.scala      | 324 ++++++++++++++++-----
 .../spark/ml/clustering/GaussianMixtureSuite.scala |  11 +
 python/pyspark/ml/clustering.py                    |  22 +-
 6 files changed, 325 insertions(+), 76 deletions(-)

diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 3d3e7a2..368f177 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -271,7 +271,7 @@ private[spark] object BLAS extends Serializable {
   }
 
   /**
-   * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's 
?SPR.
+   * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's 
?SPR.
    *
    * @param U the upper triangular part of the matrix packed in an array 
(column major)
    */
diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
 
b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
index 42746b5..a08b8af 100644
--- 
a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
+++ 
b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
@@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") (
    */
   @transient private lazy val tuple = {
     val (rootSigmaInv, u) = calculateCovarianceConstants
-    val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv)
+    val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense
     val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean)
     (rootSigmaInvMat, u, rootSigmaInvMulMu)
   }
@@ -81,6 +81,36 @@ class MultivariateGaussian @Since("2.0.0") (
     u - 0.5 * BLAS.dot(v, v)
   }
 
+  private[ml] def pdf(X: Matrix): DenseVector = {
+    val mat = DenseMatrix.zeros(X.numRows, X.numCols)
+    pdf(X, mat)
+  }
+
+  private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = {
+    require(!mat.isTransposed)
+
+    BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat)
+    val m = mat.numRows
+    val n = mat.numCols
+
+    val pdfVec = mat.multiply(rootSigmaInvMulMu)
+
+    val blas = BLAS.getBLAS(n)
+    val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1, 
rootSigmaInvMulMu.values, 1)
+
+    val localU = u
+    var i = 0
+    while (i < m) {
+      val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m)
+      val dot = pdfVec(i)
+      val squaredSum = squared1 + squared2 - dot - dot
+      pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum)
+      i += 1
+    }
+
+    pdfVec
+  }
+
   /**
    * Calculate distribution dependent components used for the density function:
    *    pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * 
inv(sigma) * (x-mu))
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
 
b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
index f2ecff1..8652d31 100644
--- 
a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
+++ 
b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
@@ -27,6 +27,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
   test("univariate") {
     val x1 = Vectors.dense(0.0)
     val x2 = Vectors.dense(1.5)
+    val mat = Matrices.fromVectors(Seq(x1, x2))
 
     val mu = Vectors.dense(0.0)
     val sigma1 = Matrices.dense(1, 1, Array(1.0))
@@ -35,6 +36,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
     assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5)
     assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5)
     assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5)
+    assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5)
 
     val sigma2 = Matrices.dense(1, 1, Array(4.0))
     val dist2 = new MultivariateGaussian(mu, sigma2)
@@ -42,11 +44,13 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
     assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5)
     assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5)
     assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5)
+    assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5)
   }
 
   test("multivariate") {
     val x1 = Vectors.dense(0.0, 0.0)
     val x2 = Vectors.dense(1.0, 1.0)
+    val mat = Matrices.fromVectors(Seq(x1, x2))
 
     val mu = Vectors.dense(0.0, 0.0)
     val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
@@ -55,6 +59,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
     assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5)
     assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5)
     assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5)
+    assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5)
 
     val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0))
     val dist2 = new MultivariateGaussian(mu, sigma2)
@@ -62,21 +67,25 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
     assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5)
     assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5)
     assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5)
+    assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5)
   }
 
   test("multivariate degenerate") {
     val x1 = Vectors.dense(0.0, 0.0)
     val x2 = Vectors.dense(1.0, 1.0)
+    val mat = Matrices.fromVectors(Seq(x1, x2))
 
     val mu = Vectors.dense(0.0, 0.0)
     val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0))
     val dist = new MultivariateGaussian(mu, sigma)
     assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5)
     assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5)
+    assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5)
   }
 
   test("SPARK-11302") {
     val x = Vectors.dense(629, 640, 1.7188, 618.19)
+    val mat = Matrices.fromVectors(Seq(x))
     val mu = Vectors.dense(
       1055.3910505836575, 1070.489299610895, 1.39020554474708, 
1040.5907503867697)
     val sigma = Matrices.dense(4, 4, Array(
@@ -87,5 +96,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
     val dist = new MultivariateGaussian(mu, sigma)
     // Agrees with R's dmvnorm: 7.154782e-05
     assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9)
+    assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5)
   }
 }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index 1c4560a..6d4137b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
  */
 private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter 
with HasFeaturesCol
   with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol 
with HasTol
-  with HasAggregationDepth {
+  with HasAggregationDepth with HasBlockSize {
 
   /**
    * Number of independent Gaussians in the mixture model. Must be greater 
than 1. Default: 2.
@@ -279,8 +279,7 @@ object GaussianMixtureModel extends 
MLReadable[GaussianMixtureModel] {
    * @param weights  Weights for each Gaussian
    * @return  Probability (partial assignment) for each of the k clusters
    */
-  private[clustering]
-  def computeProbabilities(
+  private[clustering] def computeProbabilities(
       features: Vector,
       dists: Array[MultivariateGaussian],
       weights: Array[Double]): Array[Double] = {
@@ -376,6 +375,25 @@ class GaussianMixture @Since("2.0.0") (
   def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
 
   /**
+   * Set block size for stacking input data in matrices.
+   * If blockSize == 1, then stacking will be skipped, and each vector is 
treated individually;
+   * If blockSize &gt; 1, then vectors will be stacked to blocks, and 
high-level BLAS routines
+   * will be used if possible (for example, GEMV instead of DOT, GEMM instead 
of GEMV).
+   * Recommended size is between 10 and 1000. An appropriate choice of the 
block size depends
+   * on the sparsity and dim of input datasets, the underlying BLAS 
implementation (for example,
+   * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number 
of threads).
+   * Note that existing BLAS implementations are mainly optimized for dense 
matrices, if the
+   * input dataset is sparse, stacking may bring no performance gain, the 
worse is possible
+   * performance regression.
+   * Default is 1.
+   *
+   * @group expertSetParam
+   */
+  @Since("3.1.0")
+  def setBlockSize(value: Int): this.type = set(blockSize, value)
+  setDefault(blockSize -> 1)
+
+  /**
    * Number of samples per cluster to use when initializing Gaussians.
    */
   private val numSamples = 5
@@ -392,7 +410,11 @@ class GaussianMixture @Since("2.0.0") (
       s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of 
the covariance" +
       s" matrix is quadratic in the number of features.")
 
-    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, featuresCol, predictionCol, probabilityCol, 
weightCol, k, maxIter,
+      seed, tol, aggregationDepth, blockSize)
+    instr.logNumFeatures(numFeatures)
 
     val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
       col($(weightCol)).cast(DoubleType)
@@ -401,25 +423,50 @@ class GaussianMixture @Since("2.0.0") (
     }
 
     val instances = dataset.select(DatasetUtils.columnToVector(dataset, 
$(featuresCol)), w)
-      .as[(Vector, Double)]
-      .rdd
+      .as[(Vector, Double)].rdd
+      .setName("training instances")
 
-    if (handlePersistence) {
+    if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) {
       instances.persist(StorageLevel.MEMORY_AND_DISK)
     }
 
-    val sc = spark.sparkContext
-    val numClusters = $(k)
+    // TODO: SPARK-15785 Support users supplied initial GMM.
+    val (weights, gaussians) = initRandom(instances, $(k), numFeatures)
 
-    instr.logPipelineStage(this)
-    instr.logDataset(dataset)
-    instr.logParams(this, featuresCol, predictionCol, probabilityCol, 
weightCol, k, maxIter,
-      seed, tol, aggregationDepth)
-    instr.logNumFeatures(numFeatures)
+    val (logLikelihood, iteration) = if ($(blockSize) == 1) {
+      trainOnRows(instances, weights, gaussians, numFeatures, instr)
+    } else {
+      val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble 
/ v.size }.mean()
+      instr.logNamedValue("sparsity", sparsity.toString)
+      if (sparsity > 0.5) {
+        logWarning(s"sparsity of input dataset is $sparsity, " +
+          s"which may hurt performance in high-level BLAS.")
+      }
+      trainOnBlocks(instances, weights, gaussians, numFeatures, instr)
+    }
+    if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
 
-    // TODO: SPARK-15785 Support users supplied initial GMM.
-    val (weights, gaussians) = initRandom(instances, numClusters, numFeatures)
+    val gaussianDists = gaussians.map { case (mean, covVec) =>
+      val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values)
+      new MultivariateGaussian(mean, cov)
+    }
+
+    val model = copyValues(new GaussianMixtureModel(uid, weights, 
gaussianDists))
+      .setParent(this)
+    val summary = new GaussianMixtureSummary(model.transform(dataset),
+      $(predictionCol), $(probabilityCol), $(featuresCol), $(k), 
logLikelihood, iteration)
+    instr.logNamedValue("logLikelihood", logLikelihood)
+    instr.logNamedValue("clusterSizes", summary.clusterSizes)
+    model.setSummary(Some(summary))
+  }
 
+  private def trainOnRows(
+      instances: RDD[(Vector, Double)],
+      weights: Array[Double],
+      gaussians: Array[(DenseVector, DenseVector)],
+      numFeatures: Int,
+      instr: Instrumentation): (Double, Int) = {
+    val sc = instances.sparkContext
     var logLikelihood = Double.MinValue
     var logLikelihoodPrev = 0.0
 
@@ -440,7 +487,7 @@ class GaussianMixture @Since("2.0.0") (
           val ws = agg.weights.sum
           if (iteration == 0) weightSumAccum.add(ws)
           logLikelihoodAccum.add(agg.logLikelihood)
-          Iterator.tabulate(numClusters) { i =>
+          Iterator.tabulate(bcWeights.value.length) { i =>
             (i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
           }
         } else Iterator.empty
@@ -471,21 +518,77 @@ class GaussianMixture @Since("2.0.0") (
       instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
       iteration += 1
     }
-    if (handlePersistence) {
-      instances.unpersist()
-    }
 
-    val gaussianDists = gaussians.map { case (mean, covVec) =>
-      val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values)
-      new MultivariateGaussian(mean, cov)
+    (logLikelihood, iteration)
+  }
+
+  private def trainOnBlocks(
+      instances: RDD[(Vector, Double)],
+      weights: Array[Double],
+      gaussians: Array[(DenseVector, DenseVector)],
+      numFeatures: Int,
+      instr: Instrumentation): (Double, Int) = {
+    val blocks = instances.mapPartitions { iter =>
+      iter.grouped($(blockSize))
+        .map { seq => (Matrices.fromVectors(seq.map(_._1)), 
seq.map(_._2).toArray) }
+    }.persist(StorageLevel.MEMORY_AND_DISK)
+     .setName(s"training dataset (blockSize=${$(blockSize)})")
+
+    val sc = instances.sparkContext
+    var logLikelihood = Double.MinValue
+    var logLikelihoodPrev = 0.0
+
+    var iteration = 0
+    while (iteration < $(maxIter) && math.abs(logLikelihood - 
logLikelihoodPrev) > $(tol)) {
+      val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null
+      val logLikelihoodAccum = sc.doubleAccumulator
+      val bcWeights = sc.broadcast(weights)
+      val bcGaussians = sc.broadcast(gaussians)
+
+      // aggregate the cluster contribution for all sample points,
+      // and then compute the new distributions
+      blocks.mapPartitions { iter =>
+        if (iter.nonEmpty) {
+          val agg = new BlockExpectationAggregator(numFeatures,
+            $(blockSize), bcWeights, bcGaussians)
+          while (iter.hasNext) { agg.add(iter.next) }
+          // sum of weights in this partition
+          val ws = agg.weights.sum
+          if (iteration == 0) weightSumAccum.add(ws)
+          logLikelihoodAccum.add(agg.logLikelihood)
+          agg.meanIter.zip(agg.covIter).zipWithIndex
+            .map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), 
ws)) }
+        } else Iterator.empty
+      }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
+        // update the weights, means and covariances for i-th distributions
+        BLAS.axpy(1.0, mean2, mean1)
+        BLAS.axpy(1.0, cov2, cov1)
+        (mean1, cov1, w1 + w2, ws1 + ws2)
+      }.mapValues { case (mean, cov, w, ws) =>
+        // Create new distributions based on the partial assignments
+        // (often referred to as the "M" step in literature)
+        GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
+      }.collect().foreach { case (i, (weight, gaussian)) =>
+        weights(i) = weight
+        gaussians(i) = gaussian
+      }
+
+      bcWeights.destroy()
+      bcGaussians.destroy()
+
+      if (iteration == 0) {
+        instr.logNumExamples(weightSumAccum.count)
+        instr.logSumOfWeights(weightSumAccum.value)
+      }
+
+      logLikelihoodPrev = logLikelihood         // current becomes previous
+      logLikelihood = logLikelihoodAccum.value  // this is the freshly 
computed log-likelihood
+      instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
+      iteration += 1
     }
+    blocks.unpersist()
 
-    val model = copyValues(new GaussianMixtureModel(uid, weights, 
gaussianDists)).setParent(this)
-    val summary = new GaussianMixtureSummary(model.transform(dataset),
-      $(predictionCol), $(probabilityCol), $(featuresCol), $(k), 
logLikelihood, iteration)
-    instr.logNamedValue("logLikelihood", logLikelihood)
-    instr.logNamedValue("clusterSizes", summary.clusterSizes)
-    model.setSummary(Some(summary))
+    (logLikelihood, iteration)
   }
 
   @Since("2.0.0")
@@ -626,16 +729,15 @@ private class ExpectationAggregator(
     bcWeights: Broadcast[Array[Double]],
     bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
 
-  private val k: Int = bcWeights.value.length
-  private var totalCnt: Long = 0L
-  private var newLogLikelihood: Double = 0.0
-  private lazy val newWeights: Array[Double] = Array.ofDim[Double](k)
-  private lazy val newMeans: Array[DenseVector] = Array.fill(k)(
-    new DenseVector(Array.ofDim[Double](numFeatures)))
-  private lazy val newCovs: Array[DenseVector] = Array.fill(k)(
-    new DenseVector(Array.ofDim[Double](numFeatures * (numFeatures + 1) / 2)))
+  private val k = bcWeights.value.length
+  private var totalCnt = 0L
+  private var newLogLikelihood = 0.0
+  private val covSize = numFeatures * (numFeatures + 1) / 2
+  private lazy val newWeights = Array.ofDim[Double](k)
+  @transient private lazy val newMeans = 
Array.fill(k)(Vectors.zeros(numFeatures).toDense)
+  @transient private lazy val newCovs = 
Array.fill(k)(Vectors.zeros(covSize).toDense)
 
-  @transient private lazy val oldGaussians = {
+  @transient private lazy val gaussians = {
     bcGaussians.value.map { case (mean, covVec) =>
       val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values)
       new MultivariateGaussian(mean, cov)
@@ -656,19 +758,19 @@ private class ExpectationAggregator(
    * Add a new training instance to this ExpectationAggregator, update the 
weights,
    * means and covariances for each distributions, and update the log 
likelihood.
    *
-   * @param weightedVector The instance of data point to be added.
+   * @param instance The instance of data point to be added.
    * @return This ExpectationAggregator object.
    */
-  def add(weightedVector: (Vector, Double)): this.type = {
-    val (instance: Vector, weight: Double) = weightedVector
+  def add(instance: (Vector, Double)): this.type = {
+    val (vector: Vector, weight: Double) = instance
     val localWeights = bcWeights.value
-    val localOldGaussians = oldGaussians
+    val localGaussians = gaussians
 
     val prob = new Array[Double](k)
     var probSum = 0.0
     var i = 0
     while (i < k) {
-      val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
+      val p = EPSILON + localWeights(i) * localGaussians(i).pdf(vector)
       prob(i) = p
       probSum += p
       i += 1
@@ -682,42 +784,128 @@ private class ExpectationAggregator(
     while (i < k) {
       val w = prob(i) / probSum * weight
       localNewWeights(i) += w
-      BLAS.axpy(w, instance, localNewMeans(i))
-      BLAS.spr(w, instance, localNewCovs(i))
+      BLAS.axpy(w, vector, localNewMeans(i))
+      BLAS.spr(w, vector, localNewCovs(i))
       i += 1
     }
 
     totalCnt += 1
     this
   }
+}
+
+
+/**
+ * BlockExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution in 
the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) 
Distribution
+ *                    in the mixture. Note only upper triangular part of the 
covariance
+ *                    matrix of each distribution is stored as dense vector 
(column major)
+ *                    in order to reduce shuffled data size.
+ */
+private class BlockExpectationAggregator(
+    numFeatures: Int,
+    blockSize: Int,
+    bcWeights: Broadcast[Array[Double]],
+    bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
+
+  private val k = bcWeights.value.length
+  private var totalCnt = 0L
+  private var newLogLikelihood = 0.0
+  private val covSize = numFeatures * (numFeatures + 1) / 2
+  private lazy val newWeights = Array.ofDim[Double](k)
+  @transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k)
+  @transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k)
+  @transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, 
k)
+  @transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, 
numFeatures)
+  @transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense
+
+  @transient private lazy val gaussians = {
+    bcGaussians.value.map { case (mean, covVec) =>
+      val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values)
+      new MultivariateGaussian(mean, cov)
+    }
+  }
+
+  def count: Long = totalCnt
+
+  def logLikelihood: Double = newLogLikelihood
+
+  def weights: Array[Double] = newWeights
+
+  def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense)
+
+  def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense)
 
   /**
-   * Merge another ExpectationAggregator, update the weights, means and 
covariances
-   * for each distributions, and update the log likelihood.
-   * (Note that it's in place merging; as a result, `this` object will be 
modified.)
+   * Add a new training instance block to this BlockExpectationAggregator, 
update the weights,
+   * means and covariances for each distributions, and update the log 
likelihood.
    *
-   * @param other The other ExpectationAggregator to be merged.
-   * @return This ExpectationAggregator object.
+   * @param block The instance block of data point to be added.
+   * @return This BlockExpectationAggregator object.
    */
-  def merge(other: ExpectationAggregator): this.type = {
-    if (other.count != 0) {
-      totalCnt += other.totalCnt
-
-      val localThisNewWeights = this.newWeights
-      val localOtherNewWeights = other.newWeights
-      val localThisNewMeans = this.newMeans
-      val localOtherNewMeans = other.newMeans
-      val localThisNewCovs = this.newCovs
-      val localOtherNewCovs = other.newCovs
-      var i = 0
-      while (i < k) {
-        localThisNewWeights(i) += localOtherNewWeights(i)
-        BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i))
-        BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i))
-        i += 1
-      }
-      newLogLikelihood += other.newLogLikelihood
+  def add(block: (Matrix, Array[Double])): this.type = {
+    val (matrix: Matrix, weights: Array[Double]) = block
+    require(matrix.isTransposed)
+    val size = matrix.numRows
+    require(weights.length == size)
+
+    val blas1 = BLAS.getBLAS(size)
+    val blas2 = BLAS.getBLAS(k)
+
+    val probMat = if (blockSize == size) auxiliaryProbMat else 
DenseMatrix.zeros(size, k)
+    require(!probMat.isTransposed)
+    java.util.Arrays.fill(probMat.values, EPSILON)
+
+    val pdfMat = if (blockSize == size) auxiliaryPDFMat else 
DenseMatrix.zeros(size, numFeatures)
+    var j = 0
+    while (j < k) {
+      val pdfVec = gaussians(j).pdf(matrix, pdfMat)
+      blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, 
probMat.values, j * size, 1)
+      j += 1
+    }
+
+    var i = 0
+    while (i < size) {
+      val weight = weights(i)
+      val probSum = blas2.dasum(k, probMat.values, i, size)
+      blas2.dscal(k, weight / probSum, probMat.values, i, size)
+      blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1)
+      newLogLikelihood += math.log(probSum) * weight
+      i += 1
+    }
+
+    BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat)
+
+    // compute the cov vector for each row vector
+    val covVec = auxiliaryCovVec
+    val covVecIter = matrix match {
+      case dm: DenseMatrix =>
+        Iterator.tabulate(size) { i =>
+          java.util.Arrays.fill(covVec.values, 0.0)
+          // when input block is dense, directly use nativeBLAS to avoid array 
copy
+          BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i * 
numFeatures, 1,
+            covVec.values, 0)
+          covVec
+        }
+
+      case sm: SparseMatrix =>
+        sm.rowIter.map { vec =>
+          java.util.Arrays.fill(covVec.values, 0.0)
+          BLAS.spr(1.0, vec, covVec)
+          covVec
+        }
     }
+
+    covVecIter.zipWithIndex.foreach { case (covVec, i) =>
+      BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1,
+        probMat.values, i, size, newCovsMat.values, 0, covSize)
+    }
+
+    totalCnt += size
+
     this
   }
 }
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index b35f964..d848d5a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -285,6 +285,17 @@ class GaussianMixtureSuite extends MLTest with 
DefaultReadWriteTest {
     testClusteringModelSingleProbabilisticPrediction(model, 
model.predictProbability, dataset,
       model.getFeaturesCol, model.getProbabilityCol)
   }
+
+  test("GMM on blocks") {
+    Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset =>
+      val gmm = new 
GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed)
+      val model = gmm.fit(dataset)
+      Seq(2, 4, 8, 16, 32).foreach { blockSize =>
+        val model2 = gmm.setBlockSize(blockSize).fit(dataset)
+        modelEquals(model, model2)
+      }
+    }
+  }
 }
 
 object GaussianMixtureSuite extends SparkFunSuite {
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 984ca41..8f0e9aa 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -98,7 +98,8 @@ class ClusteringSummary(JavaWrapper):
 
 @inherit_doc
 class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, 
HasPredictionCol,
-                             HasProbabilityCol, HasTol, HasAggregationDepth, 
HasWeightCol):
+                             HasProbabilityCol, HasTol, HasAggregationDepth, 
HasWeightCol,
+                             HasBlockSize):
     """
     Params for :py:class:`GaussianMixture` and 
:py:class:`GaussianMixtureModel`.
 
@@ -243,6 +244,8 @@ class GaussianMixture(JavaEstimator, 
_GaussianMixtureParams, JavaMLWritable, Jav
     >>> gm.getMaxIter()
     30
     >>> model = gm.fit(df)
+    >>> model.getBlockSize()
+    1
     >>> model.getAggregationDepth()
     2
     >>> model.getFeaturesCol()
@@ -327,16 +330,16 @@ class GaussianMixture(JavaEstimator, 
_GaussianMixtureParams, JavaMLWritable, Jav
     @keyword_only
     def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
                  probabilityCol="probability", tol=0.01, maxIter=100, 
seed=None,
-                 aggregationDepth=2, weightCol=None):
+                 aggregationDepth=2, weightCol=None, blockSize=1):
         """
         __init__(self, featuresCol="features", predictionCol="prediction", 
k=2, \
                  probabilityCol="probability", tol=0.01, maxIter=100, 
seed=None, \
-                 aggregationDepth=2, weightCol=None)
+                 aggregationDepth=2, weightCol=None, blockSize=1)
         """
         super(GaussianMixture, self).__init__()
         self._java_obj = 
self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture",
                                             self.uid)
-        self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2)
+        self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2, 
blockSize=1)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -347,11 +350,11 @@ class GaussianMixture(JavaEstimator, 
_GaussianMixtureParams, JavaMLWritable, Jav
     @since("2.0.0")
     def setParams(self, featuresCol="features", predictionCol="prediction", 
k=2,
                   probabilityCol="probability", tol=0.01, maxIter=100, 
seed=None,
-                  aggregationDepth=2, weightCol=None):
+                  aggregationDepth=2, weightCol=None, blockSize=1):
         """
         setParams(self, featuresCol="features", predictionCol="prediction", 
k=2, \
                   probabilityCol="probability", tol=0.01, maxIter=100, 
seed=None, \
-                  aggregationDepth=2, weightCol=None)
+                  aggregationDepth=2, weightCol=None, blockSize=1)
 
         Sets params for GaussianMixture.
         """
@@ -421,6 +424,13 @@ class GaussianMixture(JavaEstimator, 
_GaussianMixtureParams, JavaMLWritable, Jav
         """
         return self._set(aggregationDepth=value)
 
+    @since("3.1.0")
+    def setBlockSize(self, value):
+        """
+        Sets the value of :py:attr:`blockSize`.
+        """
+        return self._set(blockSize=value)
+
 
 class GaussianMixtureSummary(ClusteringSummary):
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to