Repository: spark
Updated Branches:
  refs/heads/master 37f963ac1 -> 88e6d7507


[SPARK-20484][MLLIB] Add documentation to ALS code

## What changes were proposed in this pull request?

This PR adds documentation to the ALS code.

## How was this patch tested?

Existing tests were used.

mengxr srowen

This contribution is my original work.  I have the license to work on this 
project under the Spark project’s open source license.

Author: Daniel Li <[email protected]>

Closes #17793 from danielyli/spark-20484.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88e6d750
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88e6d750
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88e6d750

Branch: refs/heads/master
Commit: 88e6d75072c23fa99d4df00d087d03d8c38e8c69
Parents: 37f963a
Author: Daniel Li <[email protected]>
Authored: Sun May 7 10:09:58 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Sun May 7 10:09:58 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/ml/recommendation/ALS.scala    | 236 ++++++++++++++++---
 1 file changed, 202 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88e6d750/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index a20ef72..1562bf1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging 
{
   /**
    * :: DeveloperApi ::
    * Implementation of the ALS algorithm.
+   *
+   * This implementation of the ALS factorization algorithm partitions the two 
sets of factors among
+   * Spark workers so as to reduce network communication by only sending one 
copy of each factor
+   * vector to each Spark worker on each iteration, and only if needed.  This 
is achieved by
+   * precomputing some information about the ratings matrix to determine which 
users require which
+   * item factors and vice versa.  See the Scaladoc for `InBlock` for a 
detailed explanation of how
+   * the precomputation is done.
+   *
+   * In addition, since each iteration of calculating the factor matrices 
depends on the known
+   * ratings, which are spread across Spark partitions, a naive implementation 
would incur
+   * significant network communication overhead between Spark workers, as the 
ratings RDD would be
+   * repeatedly shuffled during each iteration.  This implementation reduces 
that overhead by
+   * performing the shuffling operation up front, precomputing each 
partition's ratings dependencies
+   * and duplicating those values to the appropriate workers before starting 
iterations to solve for
+   * the factor matrices.  See the Scaladoc for `OutBlock` for a detailed 
explanation of how the
+   * precomputation is done.
+   *
+   * Note that the term "rating block" is a bit of a misnomer, as the ratings 
are not partitioned by
+   * contiguous blocks from the ratings matrix but by a hash function on the 
rating's location in
+   * the matrix.  If it helps you to visualize the partitions, it is easier to 
think of the term
+   * "block" as referring to a subset of an RDD containing the ratings rather 
than a contiguous
+   * submatrix of the ratings matrix.
    */
   @DeveloperApi
   def train[ID: ClassTag]( // scalastyle:ignore
@@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
       checkpointInterval: Int = 10,
       seed: Long = 0L)(
       implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, 
Array[Float])]) = {
+
     require(!ratings.isEmpty(), s"No ratings available from $ratings")
     require(intermediateRDDStorageLevel != StorageLevel.NONE,
       "ALS is not designed to run without persisting intermediate RDDs.")
+
     val sc = ratings.sparkContext
+
+    // Precompute the rating dependencies of each partition
     val userPart = new ALSPartitioner(numUserBlocks)
     val itemPart = new ALSPartitioner(numItemBlocks)
-    val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
-    val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
-    val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
     val blockRatings = partitionRatings(ratings, userPart, itemPart)
       .persist(intermediateRDDStorageLevel)
     val (userInBlocks, userOutBlocks) =
       makeBlocks("user", blockRatings, userPart, itemPart, 
intermediateRDDStorageLevel)
-    // materialize blockRatings and user blocks
-    userOutBlocks.count()
+    userOutBlocks.count()    // materialize blockRatings and user blocks
     val swappedBlockRatings = blockRatings.map {
       case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, 
localRatings)) =>
         ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, 
localRatings))
     }
     val (itemInBlocks, itemOutBlocks) =
       makeBlocks("item", swappedBlockRatings, itemPart, userPart, 
intermediateRDDStorageLevel)
-    // materialize item blocks
-    itemOutBlocks.count()
+    itemOutBlocks.count()    // materialize item blocks
+
+    // Encoders for storing each user/item's partition ID and index within its 
partition using a
+    // single integer; used as an optimization
+    val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
+    val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
+
+    // These are the user and item factor matrices that, once trained, are 
multiplied together to
+    // estimate the rating matrix.  The two matrices are stored in RDDs, 
partitioned by column such
+    // that each factor column resides on the same Spark worker as its 
corresponding user or item.
     val seedGen = new XORShiftRandom(seed)
     var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
     var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
+
+    val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
+
     var previousCheckpointFile: Option[String] = None
     val shouldCheckpoint: Int => Boolean = (iter) =>
       sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % 
checkpointInterval == 0)
@@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
             logWarning(s"Cannot delete checkpoint file $file:", e)
         }
       }
+
     if (implicitPrefs) {
       for (iter <- 1 to maxIter) {
         
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
@@ -910,26 +944,154 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
   private type FactorBlock = Array[Array[Float]]
 
   /**
-   * Out-link block that stores, for each dst (item/user) block, which src 
(user/item) factors to
-   * send. For example, outLinkBlock(0) contains the local indices (not the 
original src IDs) of the
-   * src factors in this block to send to dst block 0.
+   * A mapping of the columns of the items factor matrix that are needed when 
calculating each row
+   * of the users factor matrix, and vice versa.
+   *
+   * Specifically, when calculating a user factor vector, since only those 
columns of the items
+   * factor matrix that correspond to the items that that user has rated are 
needed, we can avoid
+   * having to repeatedly copy the entire items factor matrix to each worker 
later in the algorithm
+   * by precomputing these dependencies for all users, storing them in an RDD 
of `OutBlock`s.  The
+   * items' dependencies on the columns of the users factor matrix is computed 
similarly.
+   *
+   * =Example=
+   *
+   * Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` 
would look like the
+   * following:
+   *
+   * {{{
+   *     userOutBlocks.collect() == Seq(
+   *       0 -> Array(Array(0, 1), Array(0, 1)),
+   *       1 -> Array(Array(0), Array(0))
+   *     )
+   * }}}
+   *
+   * Each value in this map-like sequence is of type `Array[Array[Int]]`.  The 
values in the
+   * inner array are the ranks of the sorted user IDs in that partition; so in 
the example above,
+   * `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all 
unique user IDs in
+   * partition 0 are sorted, 0 is the first ID and 6 is the second.  The 
position of each inner
+   * array in its enclosing outer array denotes the partition number to which 
item IDs map; in the
+   * example, the first `Array(0, 1)` is in position 0 of its outer array, 
denoting item IDs that
+   * map to partition 0.
+   *
+   * In summary, the data structure encodes the following information:
+   *
+   *   *  There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, 
where 0 and 1 are the
+   *   indices of the user IDs 0 and 6 on partition 0) whose item IDs map to 
partitions 0 and 1
+   *   (represented by the fact that `Array(0, 1)` appears in both the 0th and 
1st positions).
+   *
+   *   *  There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is 
the index of the user
+   *   ID 3 on partition 1) whose item IDs map to partitions 0 and 1 
(represented by the fact that
+   *   `Array(0)` appears in both the 0th and 1st positions).
    */
   private type OutBlock = Array[Array[Int]]
 
   /**
-   * In-link block for computing src (user/item) factors. This includes the 
original src IDs
-   * of the elements within this block as well as encoded dst (item/user) 
indices and corresponding
-   * ratings. The dst indices are in the form of (blockId, localIndex), which 
are not the original
-   * dst IDs. To compute src factors, we expect receiving dst factors that 
match the dst indices.
-   * For example, if we have an in-link record
+   * In-link block for computing user and item factor matrices.
+   *
+   * The ALS algorithm partitions the columns of the users factor matrix 
evenly among Spark workers.
+   * Since each column of the factor matrix is calculated using the known 
ratings of the correspond-
+   * ing user, and since the ratings don't change across iterations, the ALS 
algorithm preshuffles
+   * the ratings to the appropriate partitions, storing them in `InBlock` 
objects.
+   *
+   * The ratings shuffled by item ID are computed similarly and also stored in 
`InBlock` objects.
+   * Note that this means every rating is stored twice, once as shuffled by 
user ID and once by item
+   * ID.  This is a necessary tradeoff, since in general a rating will not be 
on the same worker
+   * when partitioned by user as by item.
+   *
+   * =Example=
+   *
+   * Say we have a small collection of eight items to offer the seven users in 
our application.  We
+   * have some known ratings given by the users, as seen in the matrix below:
+   *
+   * {{{
+   *                       Items
+   *            0   1   2   3   4   5   6   7
+   *          +---+---+---+---+---+---+---+---+
+   *        0 |   |0.1|   |   |0.4|   |   |0.7|
+   *          +---+---+---+---+---+---+---+---+
+   *        1 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *     U  2 |   |   |   |   |   |   |   |   |
+   *     s    +---+---+---+---+---+---+---+---+
+   *     e  3 |   |3.1|   |   |3.4|   |   |3.7|
+   *     r    +---+---+---+---+---+---+---+---+
+   *     s  4 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *        5 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *        6 |   |6.1|   |   |6.4|   |   |6.7|
+   *          +---+---+---+---+---+---+---+---+
+   * }}}
+   *
+   * The ratings are represented as an RDD, passed to the `partitionRatings` 
method as the `ratings`
+   * parameter:
+   *
+   * {{{
+   *     ratings.collect() == Seq(
+   *       Rating(0, 1, 0.1f),
+   *       Rating(0, 4, 0.4f),
+   *       Rating(0, 7, 0.7f),
+   *       Rating(3, 1, 3.1f),
+   *       Rating(3, 4, 3.4f),
+   *       Rating(3, 7, 3.7f),
+   *       Rating(6, 1, 6.1f),
+   *       Rating(6, 4, 6.4f),
+   *       Rating(6, 7, 6.7f)
+   *     )
+   * }}}
    *
-   * {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
+   * Say that we are using two partitions to calculate each factor matrix:
    *
-   * and assume that the dst factors are stored as dstFactors: Map[Int, 
Array[Array[Float]]], which
-   * is a blockId to dst factors map, the corresponding dst factor of the 
record is dstFactor(2)(3).
+   * {{{
+   *     val userPart = new ALSPartitioner(2)
+   *     val itemPart = new ALSPartitioner(2)
+   *     val blockRatings = partitionRatings(ratings, userPart, itemPart)
+   * }}}
    *
-   * We use a CSC-like (compressed sparse column) format to store the in-link 
information. So we can
-   * compute src factors one after another using only one normal equation 
instance.
+   * Ratings are mapped to partitions using the user/item IDs modulo the 
number of partitions.  With
+   * two partitions, ratings with even-valued user IDs are shuffled to 
partition 0 while those with
+   * odd-valued user IDs are shuffled to partition 1:
+   *
+   * {{{
+   *     userInBlocks.collect() == Seq(
+   *       0 -> Seq(
+   *              // Internally, the class stores the ratings in a more 
optimized format than
+   *              // a sequence of `Rating`s, but for clarity we show it as 
such here.
+   *              Rating(0, 1, 0.1f),
+   *              Rating(0, 4, 0.4f),
+   *              Rating(0, 7, 0.7f),
+   *              Rating(6, 1, 6.1f),
+   *              Rating(6, 4, 6.4f),
+   *              Rating(6, 7, 6.7f)
+   *            ),
+   *       1 -> Seq(
+   *              Rating(3, 1, 3.1f),
+   *              Rating(3, 4, 3.4f),
+   *              Rating(3, 7, 3.7f)
+   *            )
+   *     )
+   * }}}
+   *
+   * Similarly, ratings with even-valued item IDs are shuffled to partition 0 
while those with
+   * odd-valued item IDs are shuffled to partition 1:
+   *
+   * {{{
+   *     itemInBlocks.collect() == Seq(
+   *       0 -> Seq(
+   *              Rating(0, 4, 0.4f),
+   *              Rating(3, 4, 3.4f),
+   *              Rating(6, 4, 6.4f)
+   *            ),
+   *       1 -> Seq(
+   *              Rating(0, 1, 0.1f),
+   *              Rating(0, 7, 0.7f),
+   *              Rating(3, 1, 3.1f),
+   *              Rating(3, 7, 3.7f),
+   *              Rating(6, 1, 6.1f),
+   *              Rating(6, 7, 6.7f)
+   *            )
+   *     )
+   * }}}
    *
    * @param srcIds src ids (ordered)
    * @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) 
of dst indices and
@@ -1026,7 +1188,24 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
   }
 
   /**
-   * Partitions raw ratings into blocks.
+   * Groups an RDD of [[Rating]]s by the user partition and item partition to 
which each `Rating`
+   * maps according to the given partitioners.  The returned pair RDD holds 
the ratings, encoded in
+   * a memory-efficient format but otherwise unchanged, keyed by the (user 
partition ID, item
+   * partition ID) pair.
+   *
+   * Performance note: This is an expensive operation that performs an RDD 
shuffle.
+   *
+   * Implementation note: This implementation produces the same result as the 
following but
+   * generates fewer intermediate objects:
+   *
+   * {{{
+   *     ratings.map { r =>
+   *       ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
+   *     }.aggregateByKey(new RatingBlockBuilder)(
+   *         seqOp = (b, r) => b.add(r),
+   *         combOp = (b0, b1) => b0.merge(b1.build()))
+   *       .mapValues(_.build())
+   * }}}
    *
    * @param ratings raw ratings
    * @param srcPart partitioner for src IDs
@@ -1037,17 +1216,6 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
       ratings: RDD[Rating[ID]],
       srcPart: Partitioner,
       dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = {
-
-     /* The implementation produces the same result as the following but 
generates less objects.
-
-     ratings.map { r =>
-       ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
-     }.aggregateByKey(new RatingBlockBuilder)(
-         seqOp = (b, r) => b.add(r),
-         combOp = (b0, b1) => b0.merge(b1.build()))
-       .mapValues(_.build())
-     */
-
     val numPartitions = srcPart.numPartitions * dstPart.numPartitions
     ratings.mapPartitions { iter =>
       val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
@@ -1135,8 +1303,8 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
     def length: Int = srcIds.length
 
     /**
-     * Compresses the block into an [[InBlock]]. The algorithm is the same as 
converting a
-     * sparse matrix from coordinate list (COO) format into compressed sparse 
column (CSC) format.
+     * Compresses the block into an `InBlock`. The algorithm is the same as 
converting a sparse
+     * matrix from coordinate list (COO) format into compressed sparse column 
(CSC) format.
      * Sorting is done using Spark's built-in Timsort to avoid generating too 
many objects.
      */
     def compress(): InBlock[ID] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to