Repository: spark
Updated Branches:
  refs/heads/branch-1.1 c085011ca -> ce06d7f45


[SPARK-3001][MLLIB] Improve Spearman's correlation

The current implementation requires sorting individual columns, which could be 
done with a global sort.

result on a 32-node cluster:

m | n | prev | this
---|---|-------|-----
1000000 | 50 | 55s | 9s
10000000 | 50 | 97s | 76s
1000000 | 100  | 119s | 15s

Author: Xiangrui Meng <[email protected]>

Closes #1917 from mengxr/spearman and squashes the following commits:

4d5d262 [Xiangrui Meng] remove unused import
85c48de [Xiangrui Meng] minor updates
a048d0c [Xiangrui Meng] remove cache and set a limit to cachedIds
b98bb18 [Xiangrui Meng] add comments
0846e07 [Xiangrui Meng] first version

(cherry picked from commit 2e069ca6560bf7ab07bd019f9530b42f4fe45014)
Signed-off-by: Xiangrui Meng <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce06d7f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce06d7f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce06d7f4

Branch: refs/heads/branch-1.1
Commit: ce06d7f45bc551f6121c382b0833e01b8a83f636
Parents: c085011
Author: Xiangrui Meng <[email protected]>
Authored: Fri Aug 15 21:07:55 2014 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Fri Aug 15 21:08:08 2014 -0700

----------------------------------------------------------------------
 .../stat/correlation/SpearmanCorrelation.scala  | 120 +++++++------------
 1 file changed, 42 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce06d7f4/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 9bd0c2c..4a6c677 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.Logging
 import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
-import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
 
 /**
  * Compute Spearman's correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends 
Correlation with Logging {
   /**
    * Compute Spearman's correlation matrix S, for the input matrix, where S(i, 
j) is the
    * correlation between column i and j.
-   *
-   * Input RDD[Vector] should be cached or checkpointed if possible since it 
would be split into
-   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
    */
   override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
-    val indexed = X.zipWithUniqueId()
-
-    val numCols = X.first.size
-    if (numCols > 50) {
-      logWarning("Computing the Spearman correlation matrix can be slow for 
large RDDs with more"
-        + " than 50 columns.")
-    }
-    val ranks = new Array[RDD[(Long, Double)]](numCols)
-
-    // Note: we use a for loop here instead of a while loop with a single 
index variable
-    // to avoid race condition caused by closure serialization
-    for (k <- 0 until numCols) {
-      val column = indexed.map { case (vector, index) => (vector(k), index) }
-      ranks(k) = getRanks(column)
+    // ((columnIndex, value), rowUid)
+    val colBased = X.zipWithUniqueId().flatMap { case (vec, uid) =>
+      vec.toArray.view.zipWithIndex.map { case (v, j) =>
+        ((j, v), uid)
+      }
     }
-
-    val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
-    PearsonCorrelation.computeCorrelationMatrix(ranksMat)
-  }
-
-  /**
-   * Compute the ranks for elements in the input RDD, using the average method 
for ties.
-   *
-   * With the average method, elements with the same value receive the same 
rank that's computed
-   * by taking the average of their positions in the sorted list.
-   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
-   * Note that positions here are 0-indexed, instead of the 1-indexed as in 
the definition for
-   * ranks in the standard definition for Spearman's correlation. This does 
not affect the final
-   * results and is slightly more performant.
-   *
-   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
-   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
-   *         copied from the input RDD.
-   */
-  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
-    // Get elements' positions in the sorted list for computing average rank 
for duplicate values
-    val sorted = indexed.sortByKey().zipWithIndex()
-
-    val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
-      // add an extra element to signify the end of the list so that flatMap 
can flush the last
-      // batch of duplicates
-      val end = -1L
-      val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, 
end), end))
-      val firstEntry = padded.next()
-      var lastVal = firstEntry._1._1
-      var firstRank = firstEntry._2.toDouble
-      val idBuffer = ArrayBuffer(firstEntry._1._2)
-      padded.flatMap { case ((v, id), rank) =>
-        if (v == lastVal && id != end) {
-          idBuffer += id
-          Iterator.empty
-        } else {
-          val entries = if (idBuffer.size == 1) {
-            Iterator((idBuffer(0), firstRank))
-          } else {
-            val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
-            idBuffer.map(id => (id, averageRank))
-          }
-          lastVal = v
-          firstRank = rank
-          idBuffer.clear()
-          idBuffer += id
-          entries
+    // global sort by (columnIndex, value)
+    val sorted = colBased.sortByKey()
+    // assign global ranks (using average ranks for tied values)
+    val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
+      var preCol = -1
+      var preVal = Double.NaN
+      var startRank = -1.0
+      var cachedUids = ArrayBuffer.empty[Long]
+      val flush: () => Iterable[(Long, (Int, Double))] = () => {
+        val averageRank = startRank + (cachedUids.size - 1) / 2.0
+        val output = cachedUids.map { uid =>
+          (uid, (preCol, averageRank))
         }
+        cachedUids.clear()
+        output
       }
+      iter.flatMap { case (((j, v), uid), rank) =>
+        // If we see a new value or cachedUids is too big, we flush ids with 
their average rank.
+        if (j != preCol || v != preVal || cachedUids.size >= 10000000) {
+          val output = flush()
+          preCol = j
+          preVal = v
+          startRank = rank
+          cachedUids += uid
+          output
+        } else {
+          cachedUids += uid
+          Iterator.empty
+        }
+      } ++ flush()
     }
-    ranks
-  }
-
-  private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: 
RDD[Vector]): RDD[Vector] = {
-    val partitioner = new HashPartitioner(input.partitions.size)
-    val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
-    cogrouped.map {
-      case (_, values: Array[Iterable[_]]) =>
-        val doubles = values.asInstanceOf[Array[Iterable[Double]]]
-        new DenseVector(doubles.flatten.toArray)
+    // Replace values in the input matrix by their ranks compared with values 
in the same column.
+    // Note that shifting all ranks in a column by a constant value doesn't 
affect result.
+    val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
+      // sort by column index and then convert values to a vector
+      Vectors.dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
     }
+    PearsonCorrelation.computeCorrelationMatrix(groupedRanks)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to