Repository: spark
Updated Branches:
  refs/heads/master 5bce45809 -> dff73bfa5


[SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for 
Repartition/RepartitionBy

## What changes were proposed in this pull request?

This PR improves `CollapseRepartition` to optimize the adjacent combinations of 
**Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this 
optimizer.

**Target Scenario**
```scala
scala> val dsView1 = spark.range(8).repartition(8, $"id")
scala> dsView1.createOrReplaceTempView("dsView1")
scala> sql("select id from dsView1 distribute by id").explain(true)
```

**Before**
```scala
scala> sql("select id from dsView1 distribute by id").explain(true)
== Parsed Logical Plan ==
'RepartitionByExpression ['id]
+- 'Project ['id]
   +- 'UnresolvedRelation `dsView1`

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [id#0L]
+- Project [id#0L]
   +- SubqueryAlias dsview1
      +- RepartitionByExpression [id#0L], 8
         +- Range (0, 8, splits=8)

== Optimized Logical Plan ==
RepartitionByExpression [id#0L]
+- RepartitionByExpression [id#0L], 8
   +- Range (0, 8, splits=8)

== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- Exchange hashpartitioning(id#0L, 8)
   +- *Range (0, 8, splits=8)
```

**After**
```scala
scala> sql("select id from dsView1 distribute by id").explain(true)
== Parsed Logical Plan ==
'RepartitionByExpression ['id]
+- 'Project ['id]
   +- 'UnresolvedRelation `dsView1`

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [id#0L]
+- Project [id#0L]
   +- SubqueryAlias dsview1
      +- RepartitionByExpression [id#0L], 8
         +- Range (0, 8, splits=8)

== Optimized Logical Plan ==
RepartitionByExpression [id#0L]
+- Range (0, 8, splits=8)

== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- *Range (0, 8, splits=8)
```

## How was this patch tested?

Pass the Jenkins tests (including a new testsuite).

Author: Dongjoon Hyun <[email protected]>

Closes #13765 from dongjoon-hyun/SPARK-16052.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dff73bfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dff73bfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dff73bfa

Branch: refs/heads/master
Commit: dff73bfa5e08c4c065584cfa9655a7839d28ad49
Parents: 5bce458
Author: Dongjoon Hyun <[email protected]>
Authored: Fri Jul 8 16:44:53 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Jul 8 16:44:53 2016 +0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  4 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |  7 +-
 .../sql/catalyst/optimizer/Optimizer.scala      | 17 ++++-
 .../optimizer/CollapseRepartitionSuite.scala    | 78 ++++++++++++++++++++
 .../sql/catalyst/parser/PlanParserSuite.scala   |  6 +-
 5 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dff73bfa/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a0ac7a9..dd670a9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -464,10 +464,10 @@ class DataFrame(object):
         +---+-----+
         |age| name|
         +---+-----+
-        |  5|  Bob|
-        |  5|  Bob|
         |  2|Alice|
+        |  5|  Bob|
         |  2|Alice|
+        |  5|  Bob|
         +---+-----+
         >>> data.rdd.getNumPartitions()
         7

http://git-wip-us.apache.org/repos/asf/spark/blob/dff73bfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 84c9cc8..5181dcc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -370,8 +370,11 @@ package object dsl {
         case plan => SubqueryAlias(alias, plan)
       }
 
-      def distribute(exprs: Expression*): LogicalPlan =
-        RepartitionByExpression(exprs, logicalPlan)
+      def repartition(num: Integer): LogicalPlan =
+        Repartition(num, shuffle = true, logicalPlan)
+
+      def distribute(exprs: Expression*)(n: Int = -1): LogicalPlan =
+        RepartitionByExpression(exprs, logicalPlan, numPartitions = if (n < 0) 
None else Some(n))
 
       def analyze: LogicalPlan =
         EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))

http://git-wip-us.apache.org/repos/asf/spark/blob/dff73bfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 03d15ea..368e9a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -556,12 +556,27 @@ object CollapseProject extends Rule[LogicalPlan] {
 }
 
 /**
- * Combines adjacent [[Repartition]] operators by keeping only the last one.
+ * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator 
combinations
+ * by keeping only the one.
+ * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
+ * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last 
[[RepartitionByExpression]].
+ * 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], 
collapse as a single
+ *    [[RepartitionByExpression]] with the expression and last number of 
partition.
  */
 object CollapseRepartition extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    // Case 1
     case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
       Repartition(numPartitions, shuffle, child)
+    // Case 2
+    case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), 
numPartitions) =>
+      RepartitionByExpression(exprs, child, numPartitions)
+    // Case 3
+    case Repartition(numPartitions, _, r: RepartitionByExpression) =>
+      r.copy(numPartitions = Some(numPartitions))
+    // Case 3
+    case RepartitionByExpression(exprs, Repartition(_, _, child), 
numPartitions) =>
+      RepartitionByExpression(exprs, child, numPartitions)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dff73bfa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
new file mode 100644
index 0000000..8952c72
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class CollapseRepartitionSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("CollapseRepartition", FixedPoint(10),
+        CollapseRepartition) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int)
+
+  test("collapse two adjacent repartitions into one") {
+    val query = testRelation
+      .repartition(10)
+      .repartition(20)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.repartition(20).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse repartition and repartitionBy into one") {
+    val query = testRelation
+      .repartition(10)
+      .distribute('a)(20)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.distribute('a)(20).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse repartitionBy and repartition into one") {
+    val query = testRelation
+      .distribute('a)(20)
+      .repartition(10)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.distribute('a)(10).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse two adjacent repartitionBys into one") {
+    val query = testRelation
+      .distribute('b)(10)
+      .distribute('a)(20)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.distribute('a)(20).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff73bfa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 456948d..fbe236e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -151,9 +151,9 @@ class PlanParserSuite extends PlanTest {
       ("", basePlan),
       (" order by a, b desc", basePlan.orderBy('a.asc, 'b.desc)),
       (" sort by a, b desc", basePlan.sortBy('a.asc, 'b.desc)),
-      (" distribute by a, b", basePlan.distribute('a, 'b)),
-      (" distribute by a sort by b", basePlan.distribute('a).sortBy('b.asc)),
-      (" cluster by a, b", basePlan.distribute('a, 'b).sortBy('a.asc, 'b.asc))
+      (" distribute by a, b", basePlan.distribute('a, 'b)()),
+      (" distribute by a sort by b", basePlan.distribute('a)().sortBy('b.asc)),
+      (" cluster by a, b", basePlan.distribute('a, 'b)().sortBy('a.asc, 
'b.asc))
     )
 
     orderSortDistrClusterClauses.foreach {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to