Repository: spark Updated Branches: refs/heads/branch-2.0 7ffafa3bf -> f7839e47c
[SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates ## What changes were proposed in this pull request? This patch fixes a minor correctness issue impacting the pushdown of filters beneath aggregates. Specifically, if a filter condition references no grouping or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed beneath an aggregate. Intuitively, the only case where you can push a filter beneath an aggregate is when that filter is deterministic and is defined over the grouping columns / expressions, since in that case the filter is acting to exclude entire groups from the query (like a `HAVING` clause). The existing code would only push deterministic filters beneath aggregates when all of the filter's references were grouping columns, but this logic missed the case where a filter has no references. For example, `WHERE false` is deterministic but is independent of the actual data. This patch fixes this minor bug by adding a new check to ensure that we don't push filters beneath aggregates when those filters don't reference any columns. ## How was this patch tested? New regression test in FilterPushdownSuite. Author: Josh Rosen <[email protected]> Closes #15289 from JoshRosen/SPARK-17712. (cherry picked from commit 37eb9184f1e9f1c07142c66936671f4711ef407d) Signed-off-by: Josh Rosen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7839e47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7839e47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7839e47 Branch: refs/heads/branch-2.0 Commit: f7839e47c3bda86d61c3b2be72c168aab4a5674f Parents: 7ffafa3 Author: Josh Rosen <[email protected]> Authored: Wed Sep 28 19:03:05 2016 -0700 Committer: Josh Rosen <[email protected]> Committed: Thu Sep 29 12:05:46 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/FilterPushdownSuite.scala | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 35b122d..4c06038 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1071,7 +1071,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) - replaced.references.subsetOf(aggregate.child.outputSet) + cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet) } val stayUp = rest ++ containingNonDeterministic http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 55836f9..019f132 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-17712: aggregate: don't push down filters that are data-independent") { + val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty) + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .select('a, 'b) + .groupBy('a)(count('a)) + .where(false) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("broadcast hint") { val originalQuery = BroadcastHint(testRelation) .where('a === 2L && 'b + Rand(10).as("rnd") === 3) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
