Repository: spark
Updated Branches:
  refs/heads/master d3ef69332 -> 27d69a057


[SPARK-11973] [SQL] push filter through aggregation with alias and literals

Currently, filter can't be pushed through aggregation with alias or literals, 
this patch fix that.

After this patch, the time of TPC-DS query 4 go down to 13 seconds from 141 
seconds (10x improvements).

cc nongli  yhuai

Author: Davies Liu <[email protected]>

Closes #9959 from davies/push_filter2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d69a05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d69a05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d69a05

Branch: refs/heads/master
Commit: 27d69a0573ed55e916a464e268dcfd5ecc6ed849
Parents: d3ef693
Author: Davies Liu <[email protected]>
Authored: Thu Nov 26 00:19:42 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Thu Nov 26 00:19:42 2015 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  9 ++++
 .../sql/catalyst/optimizer/Optimizer.scala      | 28 +++++++----
 .../optimizer/FilterPushdownSuite.scala         | 53 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6855747..304b438 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -65,6 +65,15 @@ trait PredicateHelper {
     }
   }
 
+  // Substitute any known alias from a map.
+  protected def replaceAlias(
+      condition: Expression,
+      aliases: AttributeMap[Expression]): Expression = {
+    condition.transform {
+      case a: Attribute => aliases.getOrElse(a, a)
+    }
+  }
+
   /**
    * Returns true if `expr` can be evaluated using only the output of `plan`.  
This method
    * can be used to determine when it is acceptable to move expression 
evaluation within a query

http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f4dba67..52f609b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -640,20 +640,14 @@ object PushPredicateThroughProject extends 
Rule[LogicalPlan] with PredicateHelpe
           filter
         } else {
           // Push down the small conditions without nondeterministic 
expressions.
-          val pushedCondition = deterministic.map(replaceAlias(_, 
aliasMap)).reduce(And)
+          val pushedCondition =
+            deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
           Filter(nondeterministic.reduce(And),
             project.copy(child = Filter(pushedCondition, grandChild)))
         }
       }
   }
 
-  // Substitute any attributes that are produced by the child projection, so 
that we safely
-  // eliminate it.
-  private def replaceAlias(condition: Expression, sourceAliases: 
AttributeMap[Expression]) = {
-    condition.transform {
-      case a: Attribute => sourceAliases.getOrElse(a, a)
-    }
-  }
 }
 
 /**
@@ -690,12 +684,24 @@ object PushPredicateThroughAggregate extends 
Rule[LogicalPlan] with PredicateHel
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case filter @ Filter(condition,
         aggregate @ Aggregate(groupingExpressions, aggregateExpressions, 
grandChild)) =>
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{
-        conjunct => conjunct.references subsetOf 
AttributeSet(groupingExpressions)
+
+      def hasAggregate(expression: Expression): Boolean = expression match {
+        case agg: AggregateExpression => true
+        case other => expression.children.exists(hasAggregate)
+      }
+      // Create a map of Alias for expressions that does not have 
AggregateExpression
+      val aliasMap = AttributeMap(aggregateExpressions.collect {
+        case a: Alias if !hasAggregate(a.child) => (a.toAttribute, a.child)
+      })
+
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ conjunct =>
+        val replaced = replaceAlias(conjunct, aliasMap)
+        replaced.references.subsetOf(grandChild.outputSet) && 
replaced.deterministic
       }
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
-        val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, 
grandChild))
+        val replaced = replaceAlias(pushDownPredicate, aliasMap)
+        val withPushdown = aggregate.copy(child = Filter(replaced, grandChild))
         stayUp.reduceOption(And).map(Filter(_, 
withPushdown)).getOrElse(withPushdown)
       } else {
         filter

http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0290faf..0128c22 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -697,4 +697,57 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("aggregate: push down filters with alias") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
+      .where(('c === 2L || 'aa > 4) && 'aa < 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .where('a + 1 < 3)
+      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
+      .where('c === 2L || 'aa > 4)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("aggregate: push down filters with literal") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
+      .where('c === 2L && 'd === "s")
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .where("s" === "s")
+      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
+      .where('c === 2L)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("aggregate: don't push down filters which is nondeterministic") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
+      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
+      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to