Repository: spark Updated Branches: refs/heads/master b8f88d327 -> f86a89a2e
[SPARK-5714][Mllib] Refactor initial step of LDA to remove redundant operations The `initialState` of LDA performs several RDD operations that looks redundant. This pr tries to simplify these operations. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #4501 from viirya/sim_lda and squashes the following commits: 4870fe4 [Liang-Chi Hsieh] For comments. 9af1487 [Liang-Chi Hsieh] Refactor initial step of LDA to remove redundant operations. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f86a89a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f86a89a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f86a89a2 Branch: refs/heads/master Commit: f86a89a2e081ee4593ce03398c2283fd77daac6e Parents: b8f88d3 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Tue Feb 10 21:51:15 2015 -0800 Committer: Xiangrui Meng <m...@databricks.com> Committed: Tue Feb 10 21:51:15 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/mllib/clustering/LDA.scala | 37 +++++++------------- 1 file changed, 13 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f86a89a2/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index a1d3df0..5e17c8d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -450,34 +450,23 @@ private[clustering] object LDA { // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). - val edgesWithGamma: RDD[(Edge[TokenCount], TopicCounts)] = - edges.mapPartitionsWithIndex { case (partIndex, partEdges) => - val random = new Random(partIndex + randomSeed) - partEdges.map { edge => - // Create a random gamma_{wjk} - (edge, normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)) + def createVertices(): RDD[(VertexId, TopicCounts)] = { + val verticesTMP: RDD[(VertexId, TopicCounts)] = + edges.mapPartitionsWithIndex { case (partIndex, partEdges) => + val random = new Random(partIndex + randomSeed) + partEdges.flatMap { edge => + val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) + val sum = gamma * edge.attr + Seq((edge.srcId, sum), (edge.dstId, sum)) + } } - } - def createVertices(sendToWhere: Edge[TokenCount] => VertexId): RDD[(VertexId, TopicCounts)] = { - val verticesTMP: RDD[(VertexId, (TokenCount, TopicCounts))] = - edgesWithGamma.map { case (edge, gamma: TopicCounts) => - (sendToWhere(edge), (edge.attr, gamma)) - } - verticesTMP.aggregateByKey(BDV.zeros[Double](k))( - (sum, t) => { - brzAxpy(t._1, t._2, sum) - sum - }, - (sum0, sum1) => { - sum0 += sum1 - } - ) + verticesTMP.reduceByKey(_ + _) } - val docVertices = createVertices(_.srcId) - val termVertices = createVertices(_.dstId) + + val docTermVertices = createVertices() // Partition such that edges are grouped by document - val graph = Graph(docVertices ++ termVertices, edges) + val graph = Graph(docTermVertices, edges) .partitionBy(PartitionStrategy.EdgePartition1D) new EMOptimizer(graph, k, vocabSize, docConcentration, topicConcentration, checkpointInterval) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org