Repository: spark
Updated Branches:
  refs/heads/master ce1041c38 -> af31335ad


[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle 
dependencies

This patch updates two pieces of logic that are related to handling of 
keyOrderings in ShuffleDependencies:

- The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever 
the shuffle dependency specifies a key ordering, but technically we only need 
to fall back when an aggregator is also specified. This patch updates the 
fallback logic to reflect this so that the Tungsten optimizations can apply to 
more workloads.

- The SQL Exchange operator performs defensive copying of shuffle inputs when a 
key ordering is specified, but this is unnecessary. The copying was added to 
guard against cases where ExternalSorter would buffer non-serialized records in 
memory.  When ExternalSorter is configured without an aggregator, it uses the 
following logic to determine whether to buffer records in a serialized or 
deserialized format:

   ```scala
     private val useSerializedPairBuffer =
        ordering.isEmpty &&
        conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
        ser.supportsRelocationOfSerializedObjects
   ```

   The `newOrdering.isDefined` branch in 
`ExternalSorter.needToCopyObjectsBeforeShuffle`, removed by this patch, is not 
necessary:

   - It was checked even if we weren't using sort-based shuffle, but this was 
unnecessary because only SortShuffleManager performs map-side sorting.
   - Map-side sorting during shuffle writing is only performed for shuffles 
that perform map-side aggregation as part of the shuffle (to see this, look at 
how SortShuffleWriter constructs ExternalSorter).  Since SQL never pushes 
aggregation into Spark's shuffle, we can guarantee that both the aggregator and 
ordering will be empty and Spark SQL always uses serializers that support 
relocation, so sort-shuffle will use the serialized pair buffer unless the user 
has explicitly disabled it via the SparkConf feature-flag.  Therefore, I think 
my optimization in Exchange should be safe.

Author: Josh Rosen <[email protected]>

Closes #6773 from JoshRosen/SPARK-8319 and squashes the following commits:

7a14129 [Josh Rosen] Revise comments; add handler to guard against future 
ShuffleManager implementations
07bb2c9 [Josh Rosen] Update comment to clarify circumstances under which 
shuffle operates on serialized records
269089a [Josh Rosen] Avoid unnecessary copy in SQL Exchange
34e526e [Josh Rosen] Enable Tungsten shuffle for non-agg shuffles w/ key 
orderings


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af31335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af31335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af31335a

Branch: refs/heads/master
Commit: af31335adce13e1452ce1990496c9bfac9778b5c
Parents: ce1041c
Author: Josh Rosen <[email protected]>
Authored: Sat Jun 13 16:14:24 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sat Jun 13 16:14:24 2015 -0700

----------------------------------------------------------------------
 .../shuffle/unsafe/UnsafeShuffleManager.scala    |  3 ---
 .../unsafe/UnsafeShuffleManagerSuite.scala       | 19 ++++++++++---------
 .../apache/spark/sql/execution/Exchange.scala    | 19 +++++++++++--------
 3 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index f2bfef3..df7bbd6 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
     } else if (dependency.aggregator.isDefined) {
       log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an 
aggregator is defined")
       false
-    } else if (dependency.keyOrdering.isDefined) {
-      log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key 
ordering is defined")
-      false
     } else if (dependency.partitioner.numPartitions > 
MAX_SHUFFLE_OUTPUT_PARTITIONS) {
       log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has 
more than " +
         s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")

http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
index a73e94e..6727934 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
@@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with 
Matchers {
       mapSideCombine = false
     )))
 
+    // Shuffles with key orderings are supported as long as no aggregator is 
specified
+    assert(canUseUnsafeShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = Some(mock(classOf[Ordering[Any]])),
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
   }
 
   test("unsupported shuffle dependencies") {
@@ -100,14 +109,7 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with 
Matchers {
       mapSideCombine = false
     )))
 
-    // We do not support shuffles that perform any kind of aggregation or 
sorting of keys
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = Some(mock(classOf[Ordering[Any]])),
-      aggregator = None,
-      mapSideCombine = false
-    )))
+    // We do not support shuffles that perform aggregation
     assert(!canUseUnsafeShuffle(shuffleDep(
       partitioner = new HashPartitioner(2),
       serializer = kryo,
@@ -115,7 +117,6 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with 
Matchers {
       aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
       mapSideCombine = false
     )))
-    // We do not support shuffles that perform any kind of aggregation or 
sorting of keys
     assert(!canUseUnsafeShuffle(shuffleDep(
       partitioner = new HashPartitioner(2),
       serializer = kryo,

http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index c9a1883..edc64a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
 import org.apache.spark.sql.SQLContext
@@ -81,11 +82,7 @@ case class Exchange(
       shuffleManager.isInstanceOf[UnsafeShuffleManager]
     val bypassMergeThreshold = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
     val serializeMapOutputs = 
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
-    if (newOrdering.nonEmpty) {
-      // If a new ordering is required, then records will be sorted with 
Spark's `ExternalSorter`,
-      // which requires a defensive copy.
-      true
-    } else if (sortBasedShuffleOn) {
+    if (sortBasedShuffleOn) {
       val bypassIsSupported = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
       if (bypassIsSupported && partitioner.numPartitions <= 
bypassMergeThreshold) {
         // If we're using the original SortShuffleManager and the number of 
output partitions is
@@ -96,8 +93,11 @@ case class Exchange(
       } else if (serializeMapOutputs && 
serializer.supportsRelocationOfSerializedObjects) {
         // SPARK-4550 extended sort-based shuffle to serialize individual 
records prior to sorting
         // them. This optimization is guarded by a feature-flag and is only 
applied in cases where
-        // shuffle dependency does not specify an ordering and the record 
serializer has certain
-        // properties. If this optimization is enabled, we can safely avoid 
the copy.
+        // shuffle dependency does not specify an aggregator or ordering and 
the record serializer
+        // has certain properties. If this optimization is enabled, we can 
safely avoid the copy.
+        //
+        // Exchange never configures its ShuffledRDDs with aggregators or key 
orderings, so we only
+        // need to check whether the optimization is enabled and supported by 
our serializer.
         //
         // This optimization also applies to UnsafeShuffleManager (added in 
SPARK-7081).
         false
@@ -108,9 +108,12 @@ case class Exchange(
         // both cases, we must copy.
         true
       }
-    } else {
+    } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
       // We're using hash-based shuffle, so we don't need to copy.
       false
+    } else {
+      // Catch-all case to safely handle any future ShuffleManager 
implementations.
+      true
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to