Repository: spark Updated Branches: refs/heads/master 9fe412521 -> 30363ede8
[MLlib] [SPARK-6713] Iterators in columnSimilarities for mapPartitionsWithIndex Use Iterators in columnSimilarities to allow mapPartitionsWithIndex to spill to disk. This could happen in a dense and large column - this way Spark can spill the pairs onto disk instead of building all the pairs before handing them to Spark. Another PR coming to update documentation. Author: Reza Zadeh <[email protected]> Closes #5364 from rezazadeh/optmemsim and squashes the following commits: 47c90ba [Reza Zadeh] Iterators in columnSimilarities for flatMap Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30363ede Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30363ede Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30363ede Branch: refs/heads/master Commit: 30363ede8635f2548e444697dbcf60a795b61a84 Parents: 9fe4125 Author: Reza Zadeh <[email protected]> Authored: Mon Apr 6 13:15:01 2015 -0700 Committer: Xiangrui Meng <[email protected]> Committed: Mon Apr 6 13:15:01 2015 -0700 ---------------------------------------------------------------------- .../mllib/linalg/distributed/RowMatrix.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/30363ede/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 9611115..9a89a6f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -531,7 +531,6 @@ class RowMatrix( val rand = new XORShiftRandom(indx) val scaled = new Array[Double](p.size) iter.flatMap { row => - val buf = new ListBuffer[((Int, Int), Double)]() row match { case SparseVector(size, indices, values) => val nnz = indices.size @@ -540,8 +539,9 @@ class RowMatrix( scaled(k) = values(k) / q(indices(k)) k += 1 } - k = 0 - while (k < nnz) { + + Iterator.tabulate (nnz) { k => + val buf = new ListBuffer[((Int, Int), Double)]() val i = indices(k) val iVal = scaled(k) if (iVal != 0 && rand.nextDouble() < p(i)) { @@ -555,8 +555,8 @@ class RowMatrix( l += 1 } } - k += 1 - } + buf + }.flatten case DenseVector(values) => val n = values.size var i = 0 @@ -564,8 +564,8 @@ class RowMatrix( scaled(i) = values(i) / q(i) i += 1 } - i = 0 - while (i < n) { + Iterator.tabulate (n) { i => + val buf = new ListBuffer[((Int, Int), Double)]() val iVal = scaled(i) if (iVal != 0 && rand.nextDouble() < p(i)) { var j = i + 1 @@ -577,10 +577,9 @@ class RowMatrix( j += 1 } } - i += 1 - } + buf + }.flatten } - buf } }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i.toLong, j.toLong, sim) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
