Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a8a8e0e87 -> 58e37028a


[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join

In Scala, `map` and `flatMap` of `Iterable` will copy the contents of 
`Iterable` to a new `Seq`. Such as,
```Scala
  val iterable = Seq(1, 2, 3).map(v => {
    println(v)
    v
  })
  println("Iterable map done")

  val iterator = Seq(1, 2, 3).iterator.map(v => {
    println(v)
    v
  })
  println("Iterator map done")
```
outputed
```
1
2
3
Iterable map done
Iterator map done
```
So we should use 'iterator' to reduce memory consumed by join.

Found by Johannes Simon in 
http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3C5BE70814-9D03-4F61-AE2C-0D63F2DE4446%40mail.de%3E

Author: zsxwing <zsxw...@gmail.com>

Closes #3671 from zsxwing/SPARK-4824 and squashes the following commits:

48ee7b9 [zsxwing] Remove the explicit types
95d59d6 [zsxwing] Add 'iterator' to reduce memory consumed by join

(cherry picked from commit c233ab3d8d75a33495298964fe73dbf7dd8fe305)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58e37028
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58e37028
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58e37028

Branch: refs/heads/branch-1.2
Commit: 58e37028a43883877a8f15bc2e0d3011a9ebd704
Parents: a8a8e0e
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Dec 22 14:26:28 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Dec 22 14:26:44 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala     | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/58e37028/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0a0f0c3..e1e0c42 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -485,7 +485,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] 
= {
     this.cogroup(other, partitioner).flatMapValues( pair =>
-      for (v <- pair._1; w <- pair._2) yield (v, w)
+      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
     )
   }
 
@@ -498,9 +498,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, 
(V, Option[W]))] = {
     this.cogroup(other, partitioner).flatMapValues { pair =>
       if (pair._2.isEmpty) {
-        pair._1.map(v => (v, None))
+        pair._1.iterator.map(v => (v, None))
       } else {
-        for (v <- pair._1; w <- pair._2) yield (v, Some(w))
+        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
       }
     }
   }
@@ -515,9 +515,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       : RDD[(K, (Option[V], W))] = {
     this.cogroup(other, partitioner).flatMapValues { pair =>
       if (pair._1.isEmpty) {
-        pair._2.map(w => (None, w))
+        pair._2.iterator.map(w => (None, w))
       } else {
-        for (v <- pair._1; w <- pair._2) yield (Some(v), w)
+        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
       }
     }
   }
@@ -533,9 +533,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Option[V], Option[W]))] = {
     this.cogroup(other, partitioner).flatMapValues {
-      case (vs, Seq()) => vs.map(v => (Some(v), None))
-      case (Seq(), ws) => ws.map(w => (None, Some(w)))
-      case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+      case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
+      case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
+      case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield 
(Some(v), Some(w))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to