This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 74f1176 [SPARK-27815][SQL] Predicate pushdown in one pass for
cascading joins
74f1176 is described below
commit 74f1176311676145d5d8669daacf67e5308f68b5
Author: Yesheng Ma <[email protected]>
AuthorDate: Wed Jul 3 09:01:16 2019 -0700
[SPARK-27815][SQL] Predicate pushdown in one pass for cascading joins
## What changes were proposed in this pull request?
This PR makes the predicate pushdown logic in catalyst optimizer more
efficient by unifying two existing rules `PushdownPredicates` and
`PushPredicateThroughJoin`. Previously pushing down a predicate for queries
such as `Filter(Join(Join(Join)))` requires n steps. This patch essentially
reduces this to a single pass.
To make this actually work, we need to unify a few rules such as
`CombineFilters`, `PushDownPredicate` and `PushDownPrdicateThroughJoin`.
Otherwise cases such as `Filter(Join(Filter(Join)))` still requires several
passes to fully push down predicates. This unification is done by composing
several partial functions, which makes a minimal code change and can reuse
existing UTs.
Results show that this optimization can improve the catalyst optimization
time by 16.5%. For queries with more joins, the performance is even better.
E.g., for TPC-DS q64, the performance boost is 49.2%.
## How was this patch tested?
Existing UTs + new a UT for the new rule.
Closes #24956 from yeshengm/fixed-point-opt.
Authored-by: Yesheng Ma <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 30 +++-
.../optimizer/PushDownLeftSemiAntiJoin.scala | 10 +-
.../catalyst/optimizer/ColumnPruningSuite.scala | 2 +-
.../optimizer/FilterPushdownOnePassSuite.scala | 183 +++++++++++++++++++++
.../catalyst/optimizer/FilterPushdownSuite.scala | 2 +-
.../InferFiltersFromConstraintsSuite.scala | 2 +-
.../catalyst/optimizer/JoinOptimizationSuite.scala | 2 +-
.../sql/catalyst/optimizer/JoinReorderSuite.scala | 2 +-
.../optimizer/LeftSemiAntiJoinPushDownSuite.scala | 2 +-
.../catalyst/optimizer/OptimizerLoggingSuite.scala | 12 +-
.../optimizer/OptimizerRuleExclusionSuite.scala | 6 +-
.../optimizer/PropagateEmptyRelationSuite.scala | 4 +-
.../sql/catalyst/optimizer/PruneFiltersSuite.scala | 2 +-
.../sql/catalyst/optimizer/SetOperationSuite.scala | 2 +-
.../optimizer/StarJoinCostBasedReorderSuite.scala | 2 +-
.../catalyst/optimizer/StarJoinReorderSuite.scala | 2 +-
.../spark/sql/execution/SparkOptimizer.scala | 4 +-
17 files changed, 235 insertions(+), 34 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 17b4ff7..c99d2c0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -63,8 +63,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
- PushPredicateThroughJoin,
- PushDownPredicate,
+ PushDownPredicates,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
LimitPushDown,
@@ -911,7 +910,9 @@ object CombineUnions extends Rule[LogicalPlan] {
* one conjunctive predicate.
*/
object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+ val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// The query execution/optimization does not guarantee the expressions are
evaluated in order.
// We only can combine them if and only if both are deterministic.
case Filter(fc, nf @ Filter(nc, grandChild)) if fc.deterministic &&
nc.deterministic =>
@@ -997,14 +998,29 @@ object PruneFilters extends Rule[LogicalPlan] with
PredicateHelper {
}
/**
+ * The unified version for predicate pushdown of normal operators and joins.
+ * This rule improves performance of predicate pushdown for cascading joins
such as:
+ * Filter-Join-Join-Join. Most predicates can be pushed down in a single pass.
+ */
+object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ CombineFilters.applyLocally
+ .orElse(PushPredicateThroughNonJoin.applyLocally)
+ .orElse(PushPredicateThroughJoin.applyLocally)
+ }
+}
+
+/**
* Pushes [[Filter]] operators through many operators iff:
* 1) the operator is deterministic
* 2) the predicate is deterministic and the operator will not change any of
rows.
*
* This heuristic is valid assuming the expression evaluation cost is minimal.
*/
-object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with
PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+ val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// SPARK-13473: We can't push the predicate down when the underlying
projection output non-
// deterministic field(s). Non-deterministic expressions are essentially
stateful. This
// implies that, for a given input row, the output are determined by the
expression's initial
@@ -1221,7 +1237,9 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan]
with PredicateHelper {
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++
nonDeterministic)
}
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+ val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// push the where condition down into join filter
case f @ Filter(filterCondition, Join(left, right, joinType,
joinCondition, hint)) =>
val (leftFilterConditions, rightFilterConditions, commonFilterCondition)
=
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
index 0c38900..606db85 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * This rule is a variant of [[PushDownPredicate]] which can handle
+ * This rule is a variant of [[PushPredicateThroughNonJoin]] which can handle
* pushing down Left semi and Left Anti joins below the following operators.
* 1) Project
* 2) Window
* 3) Union
* 4) Aggregate
- * 5) Other permissible unary operators. please see
[[PushDownPredicate.canPushThrough]].
+ * 5) Other permissible unary operators. please see
[[PushPredicateThroughNonJoin.canPushThrough]].
*/
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper
{
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -42,7 +42,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
with PredicateHelper {
// No join condition, just push down the Join below Project
p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint))
} else {
- val aliasMap = PushDownPredicate.getAliasMap(p)
+ val aliasMap = PushPredicateThroughNonJoin.getAliasMap(p)
val newJoinCond = if (aliasMap.nonEmpty) {
Option(replaceAlias(joinCond.get, aliasMap))
} else {
@@ -55,7 +55,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
with PredicateHelper {
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(_), _, _)
if agg.aggregateExpressions.forall(_.deterministic) &&
agg.groupingExpressions.nonEmpty &&
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
- val aliasMap = PushDownPredicate.getAliasMap(agg)
+ val aliasMap = PushPredicateThroughNonJoin.getAliasMap(agg)
val canPushDownPredicate = (predicate: Expression) => {
val replaced = replaceAlias(predicate, aliasMap)
predicate.references.nonEmpty &&
@@ -94,7 +94,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
with PredicateHelper {
// LeftSemi/LeftAnti over UnaryNode
case join @ Join(u: UnaryNode, rightOp, LeftSemiOrAnti(_), _, _)
- if PushDownPredicate.canPushThrough(u) &&
u.expressions.forall(_.deterministic) =>
+ if PushPredicateThroughNonJoin.canPushThrough(u) &&
u.expressions.forall(_.deterministic) =>
val validAttrs = u.child.outputSet ++ rightOp.outputSet
pushDownJoin(join, _.references.subsetOf(validAttrs), _.reduce(And))
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index b738f30..78ae131 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -32,7 +32,7 @@ class ColumnPruningSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Column pruning", FixedPoint(100),
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
ColumnPruning,
RemoveNoopOperators,
CollapseProject) :: Nil
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
new file mode 100644
index 0000000..6f1280c
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+/**
+ * This test suite ensures that the [[PushDownPredicates]] actually does
predicate pushdown in
+ * an efficient manner. This is enforced by asserting that a single predicate
pushdown can push
+ * all predicate to bottom as much as possible.
+ */
+class FilterPushdownOnePassSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ // this batch must reach expected state in one pass
+ Batch("Filter Pushdown One Pass", Once,
+ ReorderJoin,
+ PushDownPredicates
+ ) :: Nil
+ }
+
+ val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation2 = LocalRelation('a.int, 'd.int, 'e.int)
+
+ test("really simple predicate push down") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val originalQuery = x.join(y).where("x.a".attr === 1)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer = x.where("x.a".attr === 1).join(y).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down conjunctive predicates") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val originalQuery = x.join(y).where("x.a".attr === 1 && "y.d".attr < 1)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer = x.where("x.a".attr === 1).join(y.where("y.d".attr <
1)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down predicates for simple joins") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val originalQuery =
+ x.where("x.c".attr < 0)
+ .join(y.where("y.d".attr > 1))
+ .where("x.a".attr === 1 && "y.d".attr < 2)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.where("x.c".attr < 0 && "x.a".attr === 1)
+ .join(y.where("y.d".attr > 1 && "y.d".attr < 2)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down top-level filters for cascading joins") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val originalQuery =
+ y.join(x).join(x).join(x).join(x).join(x).where("y.d".attr === 0)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer = y.where("y.d".attr ===
0).join(x).join(x).join(x).join(x).join(x).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down predicates for tree-like joins") {
+ val x = testRelation1.subquery('x)
+ val y1 = testRelation2.subquery('y1)
+ val y2 = testRelation2.subquery('y2)
+
+ val originalQuery =
+ y1.join(x).join(x)
+ .join(y2.join(x).join(x))
+ .where("y1.d".attr === 0 && "y2.d".attr === 3)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ y1.where("y1.d".attr === 0).join(x).join(x)
+ .join(y2.where("y2.d".attr === 3).join(x).join(x)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down through join and project") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val originalQuery =
+ x.where('a > 0).select('a, 'b)
+ .join(y.where('d < 100).select('e))
+ .where("x.a".attr < 100)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.where('a > 0 && 'a < 100).select('a, 'b)
+ .join(y.where('d < 100).select('e)).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down through deep projects") {
+ val x = testRelation1.subquery('x)
+
+ val originalQuery =
+ x.select(('a + 1) as 'a1, 'b)
+ .select(('a1 + 1) as 'a2, 'b)
+ .select(('a2 + 1) as 'a3, 'b)
+ .select(('a3 + 1) as 'a4, 'b)
+ .select('b)
+ .where('b > 0)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.where('b > 0)
+ .select(('a + 1) as 'a1, 'b)
+ .select(('a1 + 1) as 'a2, 'b)
+ .select(('a2 + 1) as 'a3, 'b)
+ .select(('a3 + 1) as 'a4, 'b)
+ .select('b).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("push down through aggregate and join") {
+ val x = testRelation1.subquery('x)
+ val y = testRelation2.subquery('y)
+
+ val left = x
+ .where('c > 0)
+ .groupBy('a)('a, count('b))
+ .subquery('left)
+ val right = y
+ .where('d < 0)
+ .groupBy('a)('a, count('d))
+ .subquery('right)
+ val originalQuery = left
+ .join(right).where("left.a".attr < 100 && "right.a".attr < 100)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.where('c > 0 && 'a < 100).groupBy('a)('a, count('b))
+ .join(y.where('d < 0 && 'a < 100).groupBy('a)('a, count('d)))
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index cf4e9fc..2db4667f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -35,7 +35,7 @@ class FilterPushdownSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown", FixedPoint(10),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
BooleanSimplification,
PushPredicateThroughJoin,
CollapseProject) :: Nil
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index a40ba2d..974bc78 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -31,7 +31,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val batches =
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
InferFiltersFromConstraints,
CombineFilters,
SimplifyBinaryComparison,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index c570643..0f93305 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -34,7 +34,7 @@ class JoinOptimizationSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
BooleanSimplification,
ReorderJoin,
PushPredicateThroughJoin,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 18516ee..43e5bad 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -35,7 +35,7 @@ class JoinReorderSuite extends PlanTest with
StatsEstimationTestBase {
EliminateResolvedHint) ::
Batch("Operator Optimizations", FixedPoint(100),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
ReorderJoin,
PushPredicateThroughJoin,
ColumnPruning,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
index 00709ad..f6d1898 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
@@ -35,7 +35,7 @@ class LeftSemiPushdownSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown", FixedPoint(10),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
BooleanSimplification,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index dd7e29d..7a432d2 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -34,7 +34,7 @@ class OptimizerLoggingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Optimizer Batch", FixedPoint(100),
- PushDownPredicate, ColumnPruning, CollapseProject) ::
+ PushPredicateThroughNonJoin, ColumnPruning, CollapseProject) ::
Batch("Batch Has No Effect", Once,
ColumnPruning) :: Nil
}
@@ -99,7 +99,7 @@ class OptimizerLoggingSuite extends PlanTest {
verifyLog(
level._2,
Seq(
- PushDownPredicate.ruleName,
+ PushPredicateThroughNonJoin.ruleName,
ColumnPruning.ruleName,
CollapseProject.ruleName))
}
@@ -123,15 +123,15 @@ class OptimizerLoggingSuite extends PlanTest {
test("test log rules") {
val rulesSeq = Seq(
- Seq(PushDownPredicate.ruleName,
+ Seq(PushPredicateThroughNonJoin.ruleName,
ColumnPruning.ruleName,
CollapseProject.ruleName).reduce(_ + "," + _) ->
- Seq(PushDownPredicate.ruleName,
+ Seq(PushPredicateThroughNonJoin.ruleName,
ColumnPruning.ruleName,
CollapseProject.ruleName),
- Seq(PushDownPredicate.ruleName,
+ Seq(PushPredicateThroughNonJoin.ruleName,
ColumnPruning.ruleName).reduce(_ + "," + _) ->
- Seq(PushDownPredicate.ruleName,
+ Seq(PushPredicateThroughNonJoin.ruleName,
ColumnPruning.ruleName),
CollapseProject.ruleName ->
Seq(CollapseProject.ruleName),
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
index 7587776..2a87803 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
@@ -96,21 +96,21 @@ class OptimizerRuleExclusionSuite extends PlanTest {
val optimizer = new SimpleTestOptimizer() {
override def defaultBatches: Seq[Batch] =
Batch("push", Once,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PushPredicateThroughJoin,
PushProjectionThroughUnion) ::
Batch("pull", Once,
PullupCorrelatedPredicates) :: Nil
override def nonExcludableRules: Seq[String] =
- PushDownPredicate.ruleName ::
+ PushPredicateThroughNonJoin.ruleName ::
PullupCorrelatedPredicates.ruleName :: Nil
}
verifyExcludedRules(
optimizer,
Seq(
- PushDownPredicate.ruleName,
+ PushPredicateThroughNonJoin.ruleName,
PushProjectionThroughUnion.ruleName,
PullupCorrelatedPredicates.ruleName))
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index d395bba..9c7d4c7 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -35,7 +35,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceDistinctWithAggregate,
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PruneFilters,
PropagateEmptyRelation,
CollapseProject) :: Nil
@@ -48,7 +48,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceDistinctWithAggregate,
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PruneFilters,
CollapseProject) :: Nil
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
index 6d1a05f..526a5b0 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -35,7 +35,7 @@ class PruneFiltersSuite extends PlanTest {
Batch("Filter Pushdown and Pruning", Once,
CombineFilters,
PruneFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PushPredicateThroughJoin) :: Nil
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 3d3e361..ccc30b1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -34,7 +34,7 @@ class SetOperationSuite extends PlanTest {
Batch("Union Pushdown", FixedPoint(5),
CombineUnions,
PushProjectionThroughUnion,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
PruneFilters) :: Nil
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index baae934..f8c48d5 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -33,7 +33,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with
StatsEstimationTestBas
val batches =
Batch("Operator Optimizations", FixedPoint(100),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
ReorderJoin,
PushPredicateThroughJoin,
ColumnPruning,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 9dc653b..10e970d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -52,7 +52,7 @@ class StarJoinReorderSuite extends PlanTest with
StatsEstimationTestBase {
val batches =
Batch("Operator Optimizations", FixedPoint(100),
CombineFilters,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
ReorderJoin,
PushPredicateThroughJoin,
ColumnPruning,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index c35e5de..4ae2194 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, Optimizer,
PushDownPredicate, RemoveNoopOperators}
+import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, Optimizer,
PushPredicateThroughNonJoin, RemoveNoopOperators}
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.python.{ExtractPythonUDFFromAggregate,
ExtractPythonUDFs}
@@ -37,7 +37,7 @@ class SparkOptimizer(
// The eval-python node may be between Project/Filter and the scan node,
which breaks
// column pruning and filter push-down. Here we rerun the related
optimizer rules.
ColumnPruning,
- PushDownPredicate,
+ PushPredicateThroughNonJoin,
RemoveNoopOperators) :+
Batch("Prune File Source Table Partitions", Once,
PruneFileSourcePartitions) :+
Batch("Schema Pruning", Once, SchemaPruning)) ++
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]