Repository: spark
Updated Branches:
refs/heads/master 0ad6ce7e5 -> c55397652
[SPARK-16208][SQL] Add `PropagateEmptyRelation` optimizer
## What changes were proposed in this pull request?
This PR adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a
logical plans consisting of only empty LocalRelations.
**Optimizer Targets**
1. Binary(or Higher)-node Logical Plans
- Union with all empty children.
- Join with one or two empty children (including Intersect/Except).
2. Unary-node Logical Plans
- Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- Aggregate with all empty children and without AggregateFunction
expressions, COUNT.
- Generate with Explode because other UserDefinedGenerators like Hive UDTF
returns results.
**Sample Query**
```sql
WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
SELECT a,b
FROM t1, t2
WHERE a=b
GROUP BY a,b
HAVING a>1
ORDER BY a,b
```
**Before**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from
values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having
a>1 order by a,b").explain
== Physical Plan ==
*Sort [a#0 ASC, b#1 ASC], true, 0
+- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
+- *HashAggregate(keys=[a#0, b#1], functions=[])
+- Exchange hashpartitioning(a#0, b#1, 200)
+- *HashAggregate(keys=[a#0, b#1], functions=[])
+- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
:- *Filter (isnotnull(a#0) && (a#0 > 1))
: +- LocalTableScan [a#0]
+- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter (isnotnull(b#1) && (b#1 > 1))
+- LocalTableScan <empty>, [b#1]
```
**After**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from
values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having
a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0, b#1]
```
## How was this patch tested?
Pass the Jenkins tests (including a new testsuite).
Author: Dongjoon Hyun <[email protected]>
Closes #13906 from dongjoon-hyun/SPARK-16208.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5539765
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5539765
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5539765
Branch: refs/heads/master
Commit: c55397652ad1c6d047a8b8eb7fd92a8a1dc66306
Parents: 0ad6ce7
Author: Dongjoon Hyun <[email protected]>
Authored: Fri Jul 1 22:13:56 2016 +0800
Committer: Cheng Lian <[email protected]>
Committed: Fri Jul 1 22:13:56 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 3 +-
.../optimizer/PropagateEmptyRelation.scala | 78 +++++++++
.../optimizer/PropagateEmptyRelationSuite.scala | 162 +++++++++++++++++++
3 files changed, 242 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 842d6bc..9ee1735 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -113,7 +113,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog,
conf: CatalystConf)
Batch("Typed Filter Optimization", fixedPoint,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
- ConvertToLocalRelation) ::
+ ConvertToLocalRelation,
+ PropagateEmptyRelation) ::
Batch("OptimizeCodegen", Once,
OptimizeCodegen(conf)) ::
Batch("RewriteSubquery", Once,
http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
new file mode 100644
index 0000000..50076b1
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+/**
+ * Collapse plans consisting empty local relations generated by
[[PruneFilters]].
+ * 1. Binary(or Higher)-node Logical Plans
+ * - Union with all empty children.
+ * - Join with one or two empty children (including Intersect/Except).
+ * 2. Unary-node Logical Plans
+ * - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
+ * - Aggregate with all empty children and without AggregateFunction
expressions like COUNT.
+ * - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
+ */
+object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
+ private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+ case p: LocalRelation => p.data.isEmpty
+ case _ => false
+ }
+
+ private def containsAggregateExpression(e: Expression): Boolean = {
+ e.collectFirst { case _: AggregateFunction => () }.isDefined
+ }
+
+ private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data =
Seq.empty)
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case p: Union if p.children.forall(isEmptyLocalRelation) =>
+ empty(p)
+
+ case p @ Join(_, _, joinType, _) if
p.children.exists(isEmptyLocalRelation) => joinType match {
+ case Inner => empty(p)
+ // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin`
rule.
+ // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
+ case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) =>
empty(p)
+ case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
+ case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
+ case _ => p
+ }
+
+ case p: UnaryNode if p.children.nonEmpty &&
p.children.forall(isEmptyLocalRelation) => p match {
+ case _: Project => empty(p)
+ case _: Filter => empty(p)
+ case _: Sample => empty(p)
+ case _: Sort => empty(p)
+ case _: GlobalLimit => empty(p)
+ case _: LocalLimit => empty(p)
+ case _: Repartition => empty(p)
+ case _: RepartitionByExpression => empty(p)
+ // AggregateExpressions like COUNT(*) return their results like 0.
+ case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) =>
empty(p)
+ // Generators like Hive-style UDTF may return their records within
`close`.
+ case Generate(_: Explode, _, _, _, _, _) => empty(p)
+ case _ => p
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
new file mode 100644
index 0000000..c549832
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PropagateEmptyRelationSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("PropagateEmptyRelation", Once,
+ CombineUnions,
+ ReplaceDistinctWithAggregate,
+ ReplaceExceptWithAntiJoin,
+ ReplaceIntersectWithSemiJoin,
+ PushDownPredicate,
+ PruneFilters,
+ PropagateEmptyRelation) :: Nil
+ }
+
+ object OptimizeWithoutPropagateEmptyRelation extends
RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("OptimizeWithoutPropagateEmptyRelation", Once,
+ CombineUnions,
+ ReplaceDistinctWithAggregate,
+ ReplaceExceptWithAntiJoin,
+ ReplaceIntersectWithSemiJoin,
+ PushDownPredicate,
+ PruneFilters) :: Nil
+ }
+
+ val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data =
Seq(Row(1)))
+ val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data =
Seq(Row(1)))
+
+ test("propagate empty relation through Union") {
+ val query = testRelation1
+ .where(false)
+ .union(testRelation2.where(false))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("propagate empty relation through Join") {
+ // Testcases are tuples of (left predicate, right predicate, joinType,
correct answer)
+ // Note that `None` is used to compare with
OptimizeWithoutPropagateEmptyRelation.
+ val testcases = Seq(
+ (true, true, Inner, None),
+ (true, true, LeftOuter, None),
+ (true, true, RightOuter, None),
+ (true, true, FullOuter, None),
+ (true, true, LeftAnti, None),
+ (true, true, LeftSemi, None),
+
+ (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, LeftOuter, None),
+ (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, FullOuter, None),
+ (true, false, LeftAnti, None),
+ (true, false, LeftSemi, None),
+
+ (false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, RightOuter, None),
+ (false, true, FullOuter, None),
+ (false, true, LeftAnti, Some(LocalRelation('a.int))),
+ (false, true, LeftSemi, Some(LocalRelation('a.int))),
+
+ (false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, LeftAnti, Some(LocalRelation('a.int))),
+ (false, false, LeftSemi, Some(LocalRelation('a.int)))
+ )
+
+ testcases.foreach { case (left, right, jt, answer) =>
+ val query = testRelation1
+ .where(left)
+ .join(testRelation2.where(right), joinType = jt, condition =
Some('a.attr == 'b.attr))
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer =
+
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
+ comparePlans(optimized, correctAnswer)
+ }
+ }
+
+ test("propagate empty relation through UnaryNode") {
+ val query = testRelation1
+ .where(false)
+ .select('a)
+ .groupBy('a)('a)
+ .where('a > 1)
+ .orderBy('a.asc)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("don't propagate non-empty local relation") {
+ val query = testRelation1
+ .where(true)
+ .groupBy('a)('a)
+ .where('a > 1)
+ .orderBy('a.asc)
+ .select('a)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation1
+ .where('a > 1)
+ .groupBy('a)('a)
+ .orderBy('a.asc)
+ .select('a)
+
+ comparePlans(optimized, correctAnswer.analyze)
+ }
+
+ test("propagate empty relation through Aggregate without aggregate
function") {
+ val query = testRelation1
+ .where(false)
+ .groupBy('a)('a, ('a + 1).as('x))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int, 'x.int).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("don't propagate empty relation through Aggregate with aggregate
function") {
+ val query = testRelation1
+ .where(false)
+ .groupBy('a)(count('a))
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]