Repository: spark
Updated Branches:
refs/heads/master ce1041c38 -> af31335ad
[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle
dependencies
This patch updates two pieces of logic that are related to handling of
keyOrderings in ShuffleDependencies:
- The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever
the shuffle dependency specifies a key ordering, but technically we only need
to fall back when an aggregator is also specified. This patch updates the
fallback logic to reflect this so that the Tungsten optimizations can apply to
more workloads.
- The SQL Exchange operator performs defensive copying of shuffle inputs when a
key ordering is specified, but this is unnecessary. The copying was added to
guard against cases where ExternalSorter would buffer non-serialized records in
memory. When ExternalSorter is configured without an aggregator, it uses the
following logic to determine whether to buffer records in a serialized or
deserialized format:
```scala
private val useSerializedPairBuffer =
ordering.isEmpty &&
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
ser.supportsRelocationOfSerializedObjects
```
The `newOrdering.isDefined` branch in
`ExternalSorter.needToCopyObjectsBeforeShuffle`, removed by this patch, is not
necessary:
- It was checked even if we weren't using sort-based shuffle, but this was
unnecessary because only SortShuffleManager performs map-side sorting.
- Map-side sorting during shuffle writing is only performed for shuffles
that perform map-side aggregation as part of the shuffle (to see this, look at
how SortShuffleWriter constructs ExternalSorter). Since SQL never pushes
aggregation into Spark's shuffle, we can guarantee that both the aggregator and
ordering will be empty and Spark SQL always uses serializers that support
relocation, so sort-shuffle will use the serialized pair buffer unless the user
has explicitly disabled it via the SparkConf feature-flag. Therefore, I think
my optimization in Exchange should be safe.
Author: Josh Rosen <[email protected]>
Closes #6773 from JoshRosen/SPARK-8319 and squashes the following commits:
7a14129 [Josh Rosen] Revise comments; add handler to guard against future
ShuffleManager implementations
07bb2c9 [Josh Rosen] Update comment to clarify circumstances under which
shuffle operates on serialized records
269089a [Josh Rosen] Avoid unnecessary copy in SQL Exchange
34e526e [Josh Rosen] Enable Tungsten shuffle for non-agg shuffles w/ key
orderings
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af31335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af31335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af31335a
Branch: refs/heads/master
Commit: af31335adce13e1452ce1990496c9bfac9778b5c
Parents: ce1041c
Author: Josh Rosen <[email protected]>
Authored: Sat Jun 13 16:14:24 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sat Jun 13 16:14:24 2015 -0700
----------------------------------------------------------------------
.../shuffle/unsafe/UnsafeShuffleManager.scala | 3 ---
.../unsafe/UnsafeShuffleManagerSuite.scala | 19 ++++++++++---------
.../apache/spark/sql/execution/Exchange.scala | 19 +++++++++++--------
3 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index f2bfef3..df7bbd6 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
} else if (dependency.aggregator.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an
aggregator is defined")
false
- } else if (dependency.keyOrdering.isDefined) {
- log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key
ordering is defined")
- false
} else if (dependency.partitioner.numPartitions >
MAX_SHUFFLE_OUTPUT_PARTITIONS) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has
more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
index a73e94e..6727934 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
@@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with
Matchers {
mapSideCombine = false
)))
+ // Shuffles with key orderings are supported as long as no aggregator is
specified
+ assert(canUseUnsafeShuffle(shuffleDep(
+ partitioner = new HashPartitioner(2),
+ serializer = kryo,
+ keyOrdering = Some(mock(classOf[Ordering[Any]])),
+ aggregator = None,
+ mapSideCombine = false
+ )))
+
}
test("unsupported shuffle dependencies") {
@@ -100,14 +109,7 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with
Matchers {
mapSideCombine = false
)))
- // We do not support shuffles that perform any kind of aggregation or
sorting of keys
- assert(!canUseUnsafeShuffle(shuffleDep(
- partitioner = new HashPartitioner(2),
- serializer = kryo,
- keyOrdering = Some(mock(classOf[Ordering[Any]])),
- aggregator = None,
- mapSideCombine = false
- )))
+ // We do not support shuffles that perform aggregation
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
@@ -115,7 +117,6 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with
Matchers {
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
mapSideCombine = false
)))
- // We do not support shuffles that perform any kind of aggregation or
sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
http://git-wip-us.apache.org/repos/asf/spark/blob/af31335a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index c9a1883..edc64a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.SQLContext
@@ -81,11 +82,7 @@ case class Exchange(
shuffleManager.isInstanceOf[UnsafeShuffleManager]
val bypassMergeThreshold =
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs =
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
- if (newOrdering.nonEmpty) {
- // If a new ordering is required, then records will be sorted with
Spark's `ExternalSorter`,
- // which requires a defensive copy.
- true
- } else if (sortBasedShuffleOn) {
+ if (sortBasedShuffleOn) {
val bypassIsSupported =
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <=
bypassMergeThreshold) {
// If we're using the original SortShuffleManager and the number of
output partitions is
@@ -96,8 +93,11 @@ case class Exchange(
} else if (serializeMapOutputs &&
serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual
records prior to sorting
// them. This optimization is guarded by a feature-flag and is only
applied in cases where
- // shuffle dependency does not specify an ordering and the record
serializer has certain
- // properties. If this optimization is enabled, we can safely avoid
the copy.
+ // shuffle dependency does not specify an aggregator or ordering and
the record serializer
+ // has certain properties. If this optimization is enabled, we can
safely avoid the copy.
+ //
+ // Exchange never configures its ShuffledRDDs with aggregators or key
orderings, so we only
+ // need to check whether the optimization is enabled and supported by
our serializer.
//
// This optimization also applies to UnsafeShuffleManager (added in
SPARK-7081).
false
@@ -108,9 +108,12 @@ case class Exchange(
// both cases, we must copy.
true
}
- } else {
+ } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
// We're using hash-based shuffle, so we don't need to copy.
false
+ } else {
+ // Catch-all case to safely handle any future ShuffleManager
implementations.
+ true
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]