Repository: spark
Updated Branches:
  refs/heads/branch-1.2 26dfac6e9 -> 51081e42b


 [SPARK-4691][shuffle] Restructure a few lines in shuffle code

In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge 
"dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined"

In SortShuffleWriter.scala, "dep.aggregator.isEmpty"  is better than 
"!dep.aggregator.isDefined" ?

Author: maji2014 <ma...@asiainfo.com>

Closes #3553 from maji2014/spark-4691 and squashes the following commits:

bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala
10d0cf0 [maji2014] change a elegant way
d8f52dc [maji2014] code optimization for judgement

(cherry picked from commit b31074466a83d3d1387fc1e4337dfab9e164fc04)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51081e42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51081e42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51081e42

Branch: refs/heads/branch-1.2
Commit: 51081e42b1424c6bda166c42f8261acba7816759
Parents: 26dfac6
Author: maji2014 <ma...@asiainfo.com>
Authored: Tue Dec 9 13:13:12 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Wed Dec 17 12:15:24 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala  | 4 ++--
 .../scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala  | 3 +--
 .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala  | 4 +---
 3 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index 5baf45d..de72148 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C](
       } else {
         new InterruptibleIterator(context, 
dep.aggregator.get.combineValuesByKey(iter, context))
       }
-    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
-      throw new IllegalStateException("Aggregator is empty for map-side 
combine")
     } else {
+      require(!dep.mapSideCombine, "Map-side combine without Aggregator 
specified!")
+
       // Convert the Product2s to pairs since this is what downstream RDDs 
currently expect
       iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, 
pair._2))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 183a303..755f17d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V](
       } else {
         records
       }
-    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
-      throw new IllegalStateException("Aggregator is empty for map-side 
combine")
     } else {
+      require(!dep.mapSideCombine, "Map-side combine without Aggregator 
specified!")
       records
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index d75f9d7..27496c5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C](
   /** Write a bunch of records to this task's output */
   override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
     if (dep.mapSideCombine) {
-      if (!dep.aggregator.isDefined) {
-        throw new IllegalStateException("Aggregator is empty for map-side 
combine")
-      }
+      require(dep.aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
       sorter = new ExternalSorter[K, V, C](
         dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
       sorter.insertAll(records)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to