This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e7fa778 [SPARK-30699][ML][PYSPARK] GMM blockify input vectors
e7fa778 is described below
commit e7fa778dc7a695d3b1426de6f98a401f2fb98f39
Author: zhengruifeng <[email protected]>
AuthorDate: Tue May 12 12:54:03 2020 +0800
[SPARK-30699][ML][PYSPARK] GMM blockify input vectors
### What changes were proposed in this pull request?
1, add new param blockSize;
2, if blockSize==1, keep original behavior, code path trainOnRows;
3, if blockSize>1, standardize and stack input vectors to blocks (like
ALS/MLP), code path trainOnBlocks
### Why are the changes needed?
performance gain on dense dataset HIGGS:
1, save about 45% RAM;
2, 3X faster with openBLAS
### Does this PR introduce any user-facing change?
add a new expert param `blockSize`
### How was this patch tested?
added testsuites
Closes #27473 from zhengruifeng/blockify_gmm.
Authored-by: zhengruifeng <[email protected]>
Signed-off-by: zhengruifeng <[email protected]>
---
.../scala/org/apache/spark/ml/linalg/BLAS.scala | 2 +-
.../stat/distribution/MultivariateGaussian.scala | 32 +-
.../distribution/MultivariateGaussianSuite.scala | 10 +
.../spark/ml/clustering/GaussianMixture.scala | 324 ++++++++++++++++-----
.../spark/ml/clustering/GaussianMixtureSuite.scala | 11 +
python/pyspark/ml/clustering.py | 22 +-
6 files changed, 325 insertions(+), 76 deletions(-)
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 3d3e7a2..368f177 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -271,7 +271,7 @@ private[spark] object BLAS extends Serializable {
}
/**
- * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's
?SPR.
+ * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's
?SPR.
*
* @param U the upper triangular part of the matrix packed in an array
(column major)
*/
diff --git
a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
index 42746b5..a08b8af 100644
---
a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
+++
b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala
@@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") (
*/
@transient private lazy val tuple = {
val (rootSigmaInv, u) = calculateCovarianceConstants
- val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv)
+ val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense
val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean)
(rootSigmaInvMat, u, rootSigmaInvMulMu)
}
@@ -81,6 +81,36 @@ class MultivariateGaussian @Since("2.0.0") (
u - 0.5 * BLAS.dot(v, v)
}
+ private[ml] def pdf(X: Matrix): DenseVector = {
+ val mat = DenseMatrix.zeros(X.numRows, X.numCols)
+ pdf(X, mat)
+ }
+
+ private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = {
+ require(!mat.isTransposed)
+
+ BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat)
+ val m = mat.numRows
+ val n = mat.numCols
+
+ val pdfVec = mat.multiply(rootSigmaInvMulMu)
+
+ val blas = BLAS.getBLAS(n)
+ val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1,
rootSigmaInvMulMu.values, 1)
+
+ val localU = u
+ var i = 0
+ while (i < m) {
+ val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m)
+ val dot = pdfVec(i)
+ val squaredSum = squared1 + squared2 - dot - dot
+ pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum)
+ i += 1
+ }
+
+ pdfVec
+ }
+
/**
* Calculate distribution dependent components used for the density function:
* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t *
inv(sigma) * (x-mu))
diff --git
a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
index f2ecff1..8652d31 100644
---
a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
+++
b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala
@@ -27,6 +27,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
test("univariate") {
val x1 = Vectors.dense(0.0)
val x2 = Vectors.dense(1.5)
+ val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0)
val sigma1 = Matrices.dense(1, 1, Array(1.0))
@@ -35,6 +36,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5)
assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5)
+ assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5)
val sigma2 = Matrices.dense(1, 1, Array(4.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
@@ -42,11 +44,13 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5)
assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5)
+ assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5)
}
test("multivariate") {
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)
+ val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0, 0.0)
val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
@@ -55,6 +59,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5)
assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5)
+ assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5)
val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
@@ -62,21 +67,25 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5)
assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5)
+ assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5)
}
test("multivariate degenerate") {
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)
+ val mat = Matrices.fromVectors(Seq(x1, x2))
val mu = Vectors.dense(0.0, 0.0)
val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0))
val dist = new MultivariateGaussian(mu, sigma)
assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5)
assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5)
+ assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5)
}
test("SPARK-11302") {
val x = Vectors.dense(629, 640, 1.7188, 618.19)
+ val mat = Matrices.fromVectors(Seq(x))
val mu = Vectors.dense(
1055.3910505836575, 1070.489299610895, 1.39020554474708,
1040.5907503867697)
val sigma = Matrices.dense(4, 4, Array(
@@ -87,5 +96,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite {
val dist = new MultivariateGaussian(mu, sigma)
// Agrees with R's dmvnorm: 7.154782e-05
assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9)
+ assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index 1c4560a..6d4137b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter
with HasFeaturesCol
with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol
with HasTol
- with HasAggregationDepth {
+ with HasAggregationDepth with HasBlockSize {
/**
* Number of independent Gaussians in the mixture model. Must be greater
than 1. Default: 2.
@@ -279,8 +279,7 @@ object GaussianMixtureModel extends
MLReadable[GaussianMixtureModel] {
* @param weights Weights for each Gaussian
* @return Probability (partial assignment) for each of the k clusters
*/
- private[clustering]
- def computeProbabilities(
+ private[clustering] def computeProbabilities(
features: Vector,
dists: Array[MultivariateGaussian],
weights: Array[Double]): Array[Double] = {
@@ -376,6 +375,25 @@ class GaussianMixture @Since("2.0.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
/**
+ * Set block size for stacking input data in matrices.
+ * If blockSize == 1, then stacking will be skipped, and each vector is
treated individually;
+ * If blockSize > 1, then vectors will be stacked to blocks, and
high-level BLAS routines
+ * will be used if possible (for example, GEMV instead of DOT, GEMM instead
of GEMV).
+ * Recommended size is between 10 and 1000. An appropriate choice of the
block size depends
+ * on the sparsity and dim of input datasets, the underlying BLAS
implementation (for example,
+ * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number
of threads).
+ * Note that existing BLAS implementations are mainly optimized for dense
matrices, if the
+ * input dataset is sparse, stacking may bring no performance gain, the
worse is possible
+ * performance regression.
+ * Default is 1.
+ *
+ * @group expertSetParam
+ */
+ @Since("3.1.0")
+ def setBlockSize(value: Int): this.type = set(blockSize, value)
+ setDefault(blockSize -> 1)
+
+ /**
* Number of samples per cluster to use when initializing Gaussians.
*/
private val numSamples = 5
@@ -392,7 +410,11 @@ class GaussianMixture @Since("2.0.0") (
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of
the covariance" +
s" matrix is quadratic in the number of features.")
- val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, featuresCol, predictionCol, probabilityCol,
weightCol, k, maxIter,
+ seed, tol, aggregationDepth, blockSize)
+ instr.logNumFeatures(numFeatures)
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
@@ -401,25 +423,50 @@ class GaussianMixture @Since("2.0.0") (
}
val instances = dataset.select(DatasetUtils.columnToVector(dataset,
$(featuresCol)), w)
- .as[(Vector, Double)]
- .rdd
+ .as[(Vector, Double)].rdd
+ .setName("training instances")
- if (handlePersistence) {
+ if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
- val sc = spark.sparkContext
- val numClusters = $(k)
+ // TODO: SPARK-15785 Support users supplied initial GMM.
+ val (weights, gaussians) = initRandom(instances, $(k), numFeatures)
- instr.logPipelineStage(this)
- instr.logDataset(dataset)
- instr.logParams(this, featuresCol, predictionCol, probabilityCol,
weightCol, k, maxIter,
- seed, tol, aggregationDepth)
- instr.logNumFeatures(numFeatures)
+ val (logLikelihood, iteration) = if ($(blockSize) == 1) {
+ trainOnRows(instances, weights, gaussians, numFeatures, instr)
+ } else {
+ val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble
/ v.size }.mean()
+ instr.logNamedValue("sparsity", sparsity.toString)
+ if (sparsity > 0.5) {
+ logWarning(s"sparsity of input dataset is $sparsity, " +
+ s"which may hurt performance in high-level BLAS.")
+ }
+ trainOnBlocks(instances, weights, gaussians, numFeatures, instr)
+ }
+ if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
- // TODO: SPARK-15785 Support users supplied initial GMM.
- val (weights, gaussians) = initRandom(instances, numClusters, numFeatures)
+ val gaussianDists = gaussians.map { case (mean, covVec) =>
+ val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values)
+ new MultivariateGaussian(mean, cov)
+ }
+
+ val model = copyValues(new GaussianMixtureModel(uid, weights,
gaussianDists))
+ .setParent(this)
+ val summary = new GaussianMixtureSummary(model.transform(dataset),
+ $(predictionCol), $(probabilityCol), $(featuresCol), $(k),
logLikelihood, iteration)
+ instr.logNamedValue("logLikelihood", logLikelihood)
+ instr.logNamedValue("clusterSizes", summary.clusterSizes)
+ model.setSummary(Some(summary))
+ }
+ private def trainOnRows(
+ instances: RDD[(Vector, Double)],
+ weights: Array[Double],
+ gaussians: Array[(DenseVector, DenseVector)],
+ numFeatures: Int,
+ instr: Instrumentation): (Double, Int) = {
+ val sc = instances.sparkContext
var logLikelihood = Double.MinValue
var logLikelihoodPrev = 0.0
@@ -440,7 +487,7 @@ class GaussianMixture @Since("2.0.0") (
val ws = agg.weights.sum
if (iteration == 0) weightSumAccum.add(ws)
logLikelihoodAccum.add(agg.logLikelihood)
- Iterator.tabulate(numClusters) { i =>
+ Iterator.tabulate(bcWeights.value.length) { i =>
(i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
}
} else Iterator.empty
@@ -471,21 +518,77 @@ class GaussianMixture @Since("2.0.0") (
instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
iteration += 1
}
- if (handlePersistence) {
- instances.unpersist()
- }
- val gaussianDists = gaussians.map { case (mean, covVec) =>
- val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values)
- new MultivariateGaussian(mean, cov)
+ (logLikelihood, iteration)
+ }
+
+ private def trainOnBlocks(
+ instances: RDD[(Vector, Double)],
+ weights: Array[Double],
+ gaussians: Array[(DenseVector, DenseVector)],
+ numFeatures: Int,
+ instr: Instrumentation): (Double, Int) = {
+ val blocks = instances.mapPartitions { iter =>
+ iter.grouped($(blockSize))
+ .map { seq => (Matrices.fromVectors(seq.map(_._1)),
seq.map(_._2).toArray) }
+ }.persist(StorageLevel.MEMORY_AND_DISK)
+ .setName(s"training dataset (blockSize=${$(blockSize)})")
+
+ val sc = instances.sparkContext
+ var logLikelihood = Double.MinValue
+ var logLikelihoodPrev = 0.0
+
+ var iteration = 0
+ while (iteration < $(maxIter) && math.abs(logLikelihood -
logLikelihoodPrev) > $(tol)) {
+ val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null
+ val logLikelihoodAccum = sc.doubleAccumulator
+ val bcWeights = sc.broadcast(weights)
+ val bcGaussians = sc.broadcast(gaussians)
+
+ // aggregate the cluster contribution for all sample points,
+ // and then compute the new distributions
+ blocks.mapPartitions { iter =>
+ if (iter.nonEmpty) {
+ val agg = new BlockExpectationAggregator(numFeatures,
+ $(blockSize), bcWeights, bcGaussians)
+ while (iter.hasNext) { agg.add(iter.next) }
+ // sum of weights in this partition
+ val ws = agg.weights.sum
+ if (iteration == 0) weightSumAccum.add(ws)
+ logLikelihoodAccum.add(agg.logLikelihood)
+ agg.meanIter.zip(agg.covIter).zipWithIndex
+ .map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i),
ws)) }
+ } else Iterator.empty
+ }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
+ // update the weights, means and covariances for i-th distributions
+ BLAS.axpy(1.0, mean2, mean1)
+ BLAS.axpy(1.0, cov2, cov1)
+ (mean1, cov1, w1 + w2, ws1 + ws2)
+ }.mapValues { case (mean, cov, w, ws) =>
+ // Create new distributions based on the partial assignments
+ // (often referred to as the "M" step in literature)
+ GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
+ }.collect().foreach { case (i, (weight, gaussian)) =>
+ weights(i) = weight
+ gaussians(i) = gaussian
+ }
+
+ bcWeights.destroy()
+ bcGaussians.destroy()
+
+ if (iteration == 0) {
+ instr.logNumExamples(weightSumAccum.count)
+ instr.logSumOfWeights(weightSumAccum.value)
+ }
+
+ logLikelihoodPrev = logLikelihood // current becomes previous
+ logLikelihood = logLikelihoodAccum.value // this is the freshly
computed log-likelihood
+ instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood)
+ iteration += 1
}
+ blocks.unpersist()
- val model = copyValues(new GaussianMixtureModel(uid, weights,
gaussianDists)).setParent(this)
- val summary = new GaussianMixtureSummary(model.transform(dataset),
- $(predictionCol), $(probabilityCol), $(featuresCol), $(k),
logLikelihood, iteration)
- instr.logNamedValue("logLikelihood", logLikelihood)
- instr.logNamedValue("clusterSizes", summary.clusterSizes)
- model.setSummary(Some(summary))
+ (logLikelihood, iteration)
}
@Since("2.0.0")
@@ -626,16 +729,15 @@ private class ExpectationAggregator(
bcWeights: Broadcast[Array[Double]],
bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends
Serializable {
- private val k: Int = bcWeights.value.length
- private var totalCnt: Long = 0L
- private var newLogLikelihood: Double = 0.0
- private lazy val newWeights: Array[Double] = Array.ofDim[Double](k)
- private lazy val newMeans: Array[DenseVector] = Array.fill(k)(
- new DenseVector(Array.ofDim[Double](numFeatures)))
- private lazy val newCovs: Array[DenseVector] = Array.fill(k)(
- new DenseVector(Array.ofDim[Double](numFeatures * (numFeatures + 1) / 2)))
+ private val k = bcWeights.value.length
+ private var totalCnt = 0L
+ private var newLogLikelihood = 0.0
+ private val covSize = numFeatures * (numFeatures + 1) / 2
+ private lazy val newWeights = Array.ofDim[Double](k)
+ @transient private lazy val newMeans =
Array.fill(k)(Vectors.zeros(numFeatures).toDense)
+ @transient private lazy val newCovs =
Array.fill(k)(Vectors.zeros(covSize).toDense)
- @transient private lazy val oldGaussians = {
+ @transient private lazy val gaussians = {
bcGaussians.value.map { case (mean, covVec) =>
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values)
new MultivariateGaussian(mean, cov)
@@ -656,19 +758,19 @@ private class ExpectationAggregator(
* Add a new training instance to this ExpectationAggregator, update the
weights,
* means and covariances for each distributions, and update the log
likelihood.
*
- * @param weightedVector The instance of data point to be added.
+ * @param instance The instance of data point to be added.
* @return This ExpectationAggregator object.
*/
- def add(weightedVector: (Vector, Double)): this.type = {
- val (instance: Vector, weight: Double) = weightedVector
+ def add(instance: (Vector, Double)): this.type = {
+ val (vector: Vector, weight: Double) = instance
val localWeights = bcWeights.value
- val localOldGaussians = oldGaussians
+ val localGaussians = gaussians
val prob = new Array[Double](k)
var probSum = 0.0
var i = 0
while (i < k) {
- val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
+ val p = EPSILON + localWeights(i) * localGaussians(i).pdf(vector)
prob(i) = p
probSum += p
i += 1
@@ -682,42 +784,128 @@ private class ExpectationAggregator(
while (i < k) {
val w = prob(i) / probSum * weight
localNewWeights(i) += w
- BLAS.axpy(w, instance, localNewMeans(i))
- BLAS.spr(w, instance, localNewCovs(i))
+ BLAS.axpy(w, vector, localNewMeans(i))
+ BLAS.spr(w, vector, localNewCovs(i))
i += 1
}
totalCnt += 1
this
}
+}
+
+
+/**
+ * BlockExpectationAggregator computes the partial expectation results.
+ *
+ * @param numFeatures The number of features.
+ * @param bcWeights The broadcast weights for each Gaussian distribution in
the mixture.
+ * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal)
Distribution
+ * in the mixture. Note only upper triangular part of the
covariance
+ * matrix of each distribution is stored as dense vector
(column major)
+ * in order to reduce shuffled data size.
+ */
+private class BlockExpectationAggregator(
+ numFeatures: Int,
+ blockSize: Int,
+ bcWeights: Broadcast[Array[Double]],
+ bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends
Serializable {
+
+ private val k = bcWeights.value.length
+ private var totalCnt = 0L
+ private var newLogLikelihood = 0.0
+ private val covSize = numFeatures * (numFeatures + 1) / 2
+ private lazy val newWeights = Array.ofDim[Double](k)
+ @transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k)
+ @transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k)
+ @transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize,
k)
+ @transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize,
numFeatures)
+ @transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense
+
+ @transient private lazy val gaussians = {
+ bcGaussians.value.map { case (mean, covVec) =>
+ val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures,
covVec.values)
+ new MultivariateGaussian(mean, cov)
+ }
+ }
+
+ def count: Long = totalCnt
+
+ def logLikelihood: Double = newLogLikelihood
+
+ def weights: Array[Double] = newWeights
+
+ def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense)
+
+ def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense)
/**
- * Merge another ExpectationAggregator, update the weights, means and
covariances
- * for each distributions, and update the log likelihood.
- * (Note that it's in place merging; as a result, `this` object will be
modified.)
+ * Add a new training instance block to this BlockExpectationAggregator,
update the weights,
+ * means and covariances for each distributions, and update the log
likelihood.
*
- * @param other The other ExpectationAggregator to be merged.
- * @return This ExpectationAggregator object.
+ * @param block The instance block of data point to be added.
+ * @return This BlockExpectationAggregator object.
*/
- def merge(other: ExpectationAggregator): this.type = {
- if (other.count != 0) {
- totalCnt += other.totalCnt
-
- val localThisNewWeights = this.newWeights
- val localOtherNewWeights = other.newWeights
- val localThisNewMeans = this.newMeans
- val localOtherNewMeans = other.newMeans
- val localThisNewCovs = this.newCovs
- val localOtherNewCovs = other.newCovs
- var i = 0
- while (i < k) {
- localThisNewWeights(i) += localOtherNewWeights(i)
- BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i))
- BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i))
- i += 1
- }
- newLogLikelihood += other.newLogLikelihood
+ def add(block: (Matrix, Array[Double])): this.type = {
+ val (matrix: Matrix, weights: Array[Double]) = block
+ require(matrix.isTransposed)
+ val size = matrix.numRows
+ require(weights.length == size)
+
+ val blas1 = BLAS.getBLAS(size)
+ val blas2 = BLAS.getBLAS(k)
+
+ val probMat = if (blockSize == size) auxiliaryProbMat else
DenseMatrix.zeros(size, k)
+ require(!probMat.isTransposed)
+ java.util.Arrays.fill(probMat.values, EPSILON)
+
+ val pdfMat = if (blockSize == size) auxiliaryPDFMat else
DenseMatrix.zeros(size, numFeatures)
+ var j = 0
+ while (j < k) {
+ val pdfVec = gaussians(j).pdf(matrix, pdfMat)
+ blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1,
probMat.values, j * size, 1)
+ j += 1
+ }
+
+ var i = 0
+ while (i < size) {
+ val weight = weights(i)
+ val probSum = blas2.dasum(k, probMat.values, i, size)
+ blas2.dscal(k, weight / probSum, probMat.values, i, size)
+ blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1)
+ newLogLikelihood += math.log(probSum) * weight
+ i += 1
+ }
+
+ BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat)
+
+ // compute the cov vector for each row vector
+ val covVec = auxiliaryCovVec
+ val covVecIter = matrix match {
+ case dm: DenseMatrix =>
+ Iterator.tabulate(size) { i =>
+ java.util.Arrays.fill(covVec.values, 0.0)
+ // when input block is dense, directly use nativeBLAS to avoid array
copy
+ BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i *
numFeatures, 1,
+ covVec.values, 0)
+ covVec
+ }
+
+ case sm: SparseMatrix =>
+ sm.rowIter.map { vec =>
+ java.util.Arrays.fill(covVec.values, 0.0)
+ BLAS.spr(1.0, vec, covVec)
+ covVec
+ }
}
+
+ covVecIter.zipWithIndex.foreach { case (covVec, i) =>
+ BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1,
+ probMat.values, i, size, newCovsMat.values, 0, covSize)
+ }
+
+ totalCnt += size
+
this
}
}
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index b35f964..d848d5a 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -285,6 +285,17 @@ class GaussianMixtureSuite extends MLTest with
DefaultReadWriteTest {
testClusteringModelSingleProbabilisticPrediction(model,
model.predictProbability, dataset,
model.getFeaturesCol, model.getProbabilityCol)
}
+
+ test("GMM on blocks") {
+ Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset =>
+ val gmm = new
GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed)
+ val model = gmm.fit(dataset)
+ Seq(2, 4, 8, 16, 32).foreach { blockSize =>
+ val model2 = gmm.setBlockSize(blockSize).fit(dataset)
+ modelEquals(model, model2)
+ }
+ }
+ }
}
object GaussianMixtureSuite extends SparkFunSuite {
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 984ca41..8f0e9aa 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -98,7 +98,8 @@ class ClusteringSummary(JavaWrapper):
@inherit_doc
class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed,
HasPredictionCol,
- HasProbabilityCol, HasTol, HasAggregationDepth,
HasWeightCol):
+ HasProbabilityCol, HasTol, HasAggregationDepth,
HasWeightCol,
+ HasBlockSize):
"""
Params for :py:class:`GaussianMixture` and
:py:class:`GaussianMixtureModel`.
@@ -243,6 +244,8 @@ class GaussianMixture(JavaEstimator,
_GaussianMixtureParams, JavaMLWritable, Jav
>>> gm.getMaxIter()
30
>>> model = gm.fit(df)
+ >>> model.getBlockSize()
+ 1
>>> model.getAggregationDepth()
2
>>> model.getFeaturesCol()
@@ -327,16 +330,16 @@ class GaussianMixture(JavaEstimator,
_GaussianMixtureParams, JavaMLWritable, Jav
@keyword_only
def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
probabilityCol="probability", tol=0.01, maxIter=100,
seed=None,
- aggregationDepth=2, weightCol=None):
+ aggregationDepth=2, weightCol=None, blockSize=1):
"""
__init__(self, featuresCol="features", predictionCol="prediction",
k=2, \
probabilityCol="probability", tol=0.01, maxIter=100,
seed=None, \
- aggregationDepth=2, weightCol=None)
+ aggregationDepth=2, weightCol=None, blockSize=1)
"""
super(GaussianMixture, self).__init__()
self._java_obj =
self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture",
self.uid)
- self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2)
+ self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2,
blockSize=1)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@@ -347,11 +350,11 @@ class GaussianMixture(JavaEstimator,
_GaussianMixtureParams, JavaMLWritable, Jav
@since("2.0.0")
def setParams(self, featuresCol="features", predictionCol="prediction",
k=2,
probabilityCol="probability", tol=0.01, maxIter=100,
seed=None,
- aggregationDepth=2, weightCol=None):
+ aggregationDepth=2, weightCol=None, blockSize=1):
"""
setParams(self, featuresCol="features", predictionCol="prediction",
k=2, \
probabilityCol="probability", tol=0.01, maxIter=100,
seed=None, \
- aggregationDepth=2, weightCol=None)
+ aggregationDepth=2, weightCol=None, blockSize=1)
Sets params for GaussianMixture.
"""
@@ -421,6 +424,13 @@ class GaussianMixture(JavaEstimator,
_GaussianMixtureParams, JavaMLWritable, Jav
"""
return self._set(aggregationDepth=value)
+ @since("3.1.0")
+ def setBlockSize(self, value):
+ """
+ Sets the value of :py:attr:`blockSize`.
+ """
+ return self._set(blockSize=value)
+
class GaussianMixtureSummary(ClusteringSummary):
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]