Repository: spark Updated Branches: refs/heads/branch-1.2 26dfac6e9 -> 51081e42b
[SPARK-4691][shuffle] Restructure a few lines in shuffle code In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined" In SortShuffleWriter.scala, "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined" ? Author: maji2014 <ma...@asiainfo.com> Closes #3553 from maji2014/spark-4691 and squashes the following commits: bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala 10d0cf0 [maji2014] change a elegant way d8f52dc [maji2014] code optimization for judgement (cherry picked from commit b31074466a83d3d1387fc1e4337dfab9e164fc04) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51081e42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51081e42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51081e42 Branch: refs/heads/branch-1.2 Commit: 51081e42b1424c6bda166c42f8261acba7816759 Parents: 26dfac6 Author: maji2014 <ma...@asiainfo.com> Authored: Tue Dec 9 13:13:12 2014 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Wed Dec 17 12:15:24 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala | 4 ++-- .../scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala | 3 +-- .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 4 +--- 3 files changed, 4 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 5baf45d..de72148 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C]( } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") + // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 183a303..755f17d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V]( } else { records } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } http://git-wip-us.apache.org/repos/asf/spark/blob/51081e42/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index d75f9d7..27496c5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { - if (!dep.aggregator.isDefined) { - throw new IllegalStateException("Aggregator is empty for map-side combine") - } + require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org