Repository: spark
Updated Branches:
  refs/heads/master 738f134bf -> 91b1ef28d


[SPARK-16164][SQL] Update `CombineFilters` to try to construct predicates with 
child predicate first

## What changes were proposed in this pull request?

This PR changes `CombineFilters` to compose the final predicate condition by 
using (`child predicate` AND `parent predicate`) instead of (`parent predicate` 
AND `child predicate`). This is a best effort approach. Some other optimization 
rules may destroy this order by reorganizing conjunctive predicates.

**Reported Error Scenario**
Chris McCubbin reported a bug when he used StringIndexer in an ML pipeline with 
additional filters. It seems that during filter pushdown, we changed the 
ordering in the logical plan.
```scala
import org.apache.spark.ml.feature._
val df1 = (0 until 3).map(_.toString).toDF
val indexer = new StringIndexer()
  .setInputCol("value")
  .setOutputCol("idx")
  .setHandleInvalid("skip")
  .fit(df1)
val df2 = (0 until 5).map(_.toString).toDF
val predictions = indexer.transform(df2)
predictions.show() // this is okay
predictions.where('idx > 2).show() // this will throw an exception
```

Please see the notebook at 
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html
 for error messages.

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <[email protected]>

Closes #13872 from dongjoon-hyun/SPARK-16164.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91b1ef28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91b1ef28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91b1ef28

Branch: refs/heads/master
Commit: 91b1ef28d134313d7b6faaffa1c390f3ca4455d0
Parents: 738f134
Author: Dongjoon Hyun <[email protected]>
Authored: Thu Jun 23 15:27:43 2016 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Thu Jun 23 15:27:43 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala  |  2 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala  | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91b1ef28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6e78ad0..2bca31d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1002,7 +1002,7 @@ object CombineFilters extends Rule[LogicalPlan] with 
PredicateHelper {
       (ExpressionSet(splitConjunctivePredicates(fc)) --
         ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match 
{
         case Some(ac) =>
-          Filter(And(ac, nc), grandChild)
+          Filter(And(nc, ac), grandChild)
         case None =>
           nf
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/91b1ef28/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b8f28e8..9cb49e7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -94,6 +94,24 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-16164: Filter pushdown should keep the ordering in the logical 
plan") {
+    val originalQuery =
+      testRelation
+        .where('a === 1)
+        .select('a, 'b)
+        .where('b === 1)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where('a === 1 && 'b === 1)
+        .select('a, 'b)
+        .analyze
+
+    // We can not use comparePlans here because it normalized the plan.
+    assert(optimized == correctAnswer)
+  }
+
   test("can't push without rewrite") {
     val originalQuery =
       testRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to