Repository: spark
Updated Branches:
  refs/heads/master 31bd1dab1 -> b375397b1


[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for 
DataFrame.repartition(1)

## What changes were proposed in this pull request?

In `ShuffleExchangeExec`, we don't need to insert extra local sort before 
round-robin partitioning, if the new partitioning has only 1 partition, because 
under that case all output rows go to the same partition.

## How was this patch tested?

The existing test cases.

Author: Xingbo Jiang <[email protected]>

Closes #20426 from jiangxb1987/repartition1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b375397b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b375397b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b375397b

Branch: refs/heads/master
Commit: b375397b1678b7fe20a0b7f87a7e8b37ae5646ef
Parents: 31bd1da
Author: Xingbo Jiang <[email protected]>
Authored: Tue Jan 30 11:40:42 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Jan 30 11:40:42 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/exchange/ShuffleExchangeExec.scala       | 4 ++++
 .../apache/spark/sql/execution/streaming/ForeachSinkSuite.scala  | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 76c1fa6..4d95ee3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -257,7 +257,11 @@ object ShuffleExchangeExec {
       //
       // Currently we following the most straight-forward way that perform a 
local sort before
       // partitioning.
+      //
+      // Note that we don't perform local sort if the new partitioning has 
only 1 partition, under
+      // that case all output rows go to the same partition.
       val newRdd = if (SQLConf.get.sortBeforeRepartition &&
+          newPartitioning.numPartitions > 1 &&
           newPartitioning.isInstanceOf[RoundRobinPartitioning]) {
         rdd.mapPartitionsInternal { iter =>
           val recordComparatorSupplier = new Supplier[RecordComparator] {

http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 1248c67..41434e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -162,7 +162,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
       val allEvents = ForeachSinkSuite.allEvents()
       assert(allEvents.size === 1)
       assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version 
= 0))
-      assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 2))
+      assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
 
       // `close` should be called with the error
       val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to