Repository: spark
Updated Branches:
  refs/heads/branch-1.0 0b0b8954c -> 3bb5d2f8a


[SPARK-2412] CoalescedRDD throws exception with certain pref locs

If the first pass of CoalescedRDD does not find the target number of locations 
AND the second pass finds new locations, an exception is thrown, as 
"groupHash.get(nxt_replica).get" is not valid.

The fix is just to add an ArrayBuffer to groupHash for that replica if it 
didn't already exist.

Author: Aaron Davidson <[email protected]>

Closes #1337 from aarondav/2412 and squashes the following commits:

f587b5d [Aaron Davidson] getOrElseUpdate
3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with 
certain pref locs
(cherry picked from commit 7c23c0dc3ed721c95690fc49f435d9de6952523c)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bb5d2f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bb5d2f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bb5d2f8

Branch: refs/heads/branch-1.0
Commit: 3bb5d2f8a08285989ea91c039adc7978fc2efae0
Parents: 0b0b895
Author: Aaron Davidson <[email protected]>
Authored: Thu Jul 17 01:01:14 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu Jul 17 01:01:25 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala     |  4 ++--
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala    | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3bb5d2f8/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c45b759..e7221e3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, 
prev: RDD[_], balanc
         val pgroup = PartitionGroup(nxt_replica)
         groupArr += pgroup
         addPartToPGroup(nxt_part, pgroup)
-        groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we 
have multiple
+        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we 
have multiple
         numCreated += 1
       }
     }
@@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, 
prev: RDD[_], balanc
       var (nxt_replica, nxt_part) = rotIt.next()
       val pgroup = PartitionGroup(nxt_replica)
       groupArr += pgroup
-      groupHash.get(nxt_replica).get += pgroup
+      groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
       var tries = 0
       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // 
ensure at least one part
         nxt_part = rotIt.next()._2

http://git-wip-us.apache.org/repos/asf/spark/blob/3bb5d2f8/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 87bfce3..a8880d7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  // Test for SPARK-2412 -- ensure that the second pass of the algorithm does 
not throw an exception
+  test("coalesced RDDs with locality, fail first pass") {
+    val initialPartitions = 1000
+    val targetLen = 50
+    val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 
0.5).toInt // = 492
+
+    val blocks = (1 to initialPartitions).map { i =>
+      (i, List(if (i > couponCount) "m2" else "m1"))
+    }
+    val data = sc.makeRDD(blocks)
+    val coalesced = data.coalesce(targetLen)
+    assert(coalesced.partitions.length == targetLen)
+  }
+
   test("zipped RDDs") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val zipped = nums.zip(nums.map(_ + 1.0))

Reply via email to