Repository: spark
Updated Branches:
  refs/heads/master 9fe412521 -> 30363ede8


[MLlib] [SPARK-6713] Iterators in columnSimilarities for mapPartitionsWithIndex

Use Iterators in columnSimilarities to allow mapPartitionsWithIndex to spill to 
disk. This could happen in a dense and large column - this way Spark can spill 
the pairs onto disk instead of building all the pairs before handing them to 
Spark.

Another PR coming to update documentation.

Author: Reza Zadeh <[email protected]>

Closes #5364 from rezazadeh/optmemsim and squashes the following commits:

47c90ba [Reza Zadeh] Iterators in columnSimilarities for flatMap


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30363ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30363ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30363ede

Branch: refs/heads/master
Commit: 30363ede8635f2548e444697dbcf60a795b61a84
Parents: 9fe4125
Author: Reza Zadeh <[email protected]>
Authored: Mon Apr 6 13:15:01 2015 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Mon Apr 6 13:15:01 2015 -0700

----------------------------------------------------------------------
 .../mllib/linalg/distributed/RowMatrix.scala     | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30363ede/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 9611115..9a89a6f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -531,7 +531,6 @@ class RowMatrix(
       val rand = new XORShiftRandom(indx)
       val scaled = new Array[Double](p.size)
       iter.flatMap { row =>
-        val buf = new ListBuffer[((Int, Int), Double)]()
         row match {
           case SparseVector(size, indices, values) =>
             val nnz = indices.size
@@ -540,8 +539,9 @@ class RowMatrix(
               scaled(k) = values(k) / q(indices(k))
               k += 1
             }
-            k = 0
-            while (k < nnz) {
+
+            Iterator.tabulate (nnz) { k =>
+              val buf = new ListBuffer[((Int, Int), Double)]()
               val i = indices(k)
               val iVal = scaled(k)
               if (iVal != 0 && rand.nextDouble() < p(i)) {
@@ -555,8 +555,8 @@ class RowMatrix(
                   l += 1
                 }
               }
-              k += 1
-            }
+              buf
+            }.flatten
           case DenseVector(values) =>
             val n = values.size
             var i = 0
@@ -564,8 +564,8 @@ class RowMatrix(
               scaled(i) = values(i) / q(i)
               i += 1
             }
-            i = 0
-            while (i < n) {
+            Iterator.tabulate (n) { i =>
+              val buf = new ListBuffer[((Int, Int), Double)]()
               val iVal = scaled(i)
               if (iVal != 0 && rand.nextDouble() < p(i)) {
                 var j = i + 1
@@ -577,10 +577,9 @@ class RowMatrix(
                   j += 1
                 }
               }
-              i += 1
-            }
+              buf
+            }.flatten
         }
-        buf
       }
     }.reduceByKey(_ + _).map { case ((i, j), sim) =>
       MatrixEntry(i.toLong, j.toLong, sim)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to