Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2ce240cfe -> 6cb24de99


[SPARK-16164][SQL] Update `CombineFilters` to try to construct predicates with 
child predicate first

## What changes were proposed in this pull request?

This PR changes `CombineFilters` to compose the final predicate condition by 
using (`child predicate` AND `parent predicate`) instead of (`parent predicate` 
AND `child predicate`). This is a best effort approach. Some other optimization 
rules may destroy this order by reorganizing conjunctive predicates.

**Reported Error Scenario**
Chris McCubbin reported a bug when he used StringIndexer in an ML pipeline with 
additional filters. It seems that during filter pushdown, we changed the 
ordering in the logical plan.
```scala
import org.apache.spark.ml.feature._
val df1 = (0 until 3).map(_.toString).toDF
val indexer = new StringIndexer()
  .setInputCol("value")
  .setOutputCol("idx")
  .setHandleInvalid("skip")
  .fit(df1)
val df2 = (0 until 5).map(_.toString).toDF
val predictions = indexer.transform(df2)
predictions.show() // this is okay
predictions.where('idx > 2).show() // this will throw an exception
```

Please see the notebook at 
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html
 for error messages.

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <[email protected]>

Closes #13872 from dongjoon-hyun/SPARK-16164.

(cherry picked from commit 91b1ef28d134313d7b6faaffa1c390f3ca4455d0)
Signed-off-by: Xiangrui Meng <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cb24de9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cb24de9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cb24de9

Branch: refs/heads/branch-2.0
Commit: 6cb24de99e011ce97fb7d3513a2760b0d1a85a45
Parents: 2ce240c
Author: Dongjoon Hyun <[email protected]>
Authored: Thu Jun 23 15:27:43 2016 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Thu Jun 23 15:27:50 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala  |  2 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala  | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6cb24de9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6190f7a..6b10484 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -963,7 +963,7 @@ object CombineFilters extends Rule[LogicalPlan] with 
PredicateHelper {
       (ExpressionSet(splitConjunctivePredicates(fc)) --
         ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match 
{
         case Some(ac) =>
-          Filter(And(ac, nc), grandChild)
+          Filter(And(nc, ac), grandChild)
         case None =>
           nf
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6cb24de9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b8f28e8..9cb49e7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -94,6 +94,24 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-16164: Filter pushdown should keep the ordering in the logical 
plan") {
+    val originalQuery =
+      testRelation
+        .where('a === 1)
+        .select('a, 'b)
+        .where('b === 1)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where('a === 1 && 'b === 1)
+        .select('a, 'b)
+        .analyze
+
+    // We can not use comparePlans here because it normalized the plan.
+    assert(optimized == correctAnswer)
+  }
+
   test("can't push without rewrite") {
     val originalQuery =
       testRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to