Repository: spark Updated Branches: refs/heads/branch-2.1 d128b6a39 -> b94fb284b
[SPARK-19017][SQL] NOT IN subquery with more than one column may return incorrect results ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`. Example: The query select a1,b1 from t1 where (a1,b1) not in (select a2,b2 from t2); has the (a1, b1) = (a2, b2) rewritten from (before this fix): Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17))) to (after this fix): Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17)))) ## How was this patch tested? sql/test, catalyst/test and new test cases in SQLQueryTestSuite. Author: Nattavut Sutyanyong <nsy....@gmail.com> Closes #16467 from nsyca/19017. (cherry picked from commit cdb691eb4da5dbf52dccf1da0ae57a9b1874f010) Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b94fb284 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b94fb284 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b94fb284 Branch: refs/heads/branch-2.1 Commit: b94fb284b93c763cf6e604705509a4e970d6ce6e Parents: d128b6a Author: Nattavut Sutyanyong <nsy....@gmail.com> Authored: Tue Jan 24 23:31:06 2017 +0100 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Tue Jan 24 23:31:19 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/catalyst/optimizer/subquery.scala | 10 +++- .../in-subquery/not-in-multiple-columns.sql | 55 ++++++++++++++++++ .../in-subquery/not-in-multiple-columns.sql.out | 59 ++++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 7 ++- .../org/apache/spark/sql/SubquerySuite.scala | 6 +- 5 files changed, 131 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab..4d62cce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -68,8 +68,14 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or) - Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get))) + // Expand the NOT IN expression with the NULL-aware semantic + // to its full form. That is from: + // (a1,b1,...) = (a2,b2,...) + // to + // (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ... + val joinConds = splitConjunctivePredicates(joinCond.get) + val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And) + Join(outerPlan, sub, LeftAnti, Option(pairs)) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql new file mode 100644 index 0000000..db66850 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql @@ -0,0 +1,55 @@ +-- This file contains test cases for NOT IN subquery with multiple columns. + +-- The data sets are populated as follows: +-- 1) When T1.A1 = T2.A2 +-- 1.1) T1.B1 = T2.B2 +-- 1.2) T1.B1 = T2.B2 returns false +-- 1.3) T1.B1 is null +-- 1.4) T2.B2 is null +-- 2) When T1.A1 = T2.A2 returns false +-- 3) When T1.A1 is null +-- 4) When T1.A2 is null + +-- T1.A1 T1.B1 T2.A2 T2.B2 +-- ----- ----- ----- ----- +-- 1 1 1 1 (1.1) +-- 1 3 (1.2) +-- 1 null 1 null (1.3 & 1.4) +-- +-- 2 1 1 1 (2) +-- null 1 (3) +-- null 3 (4) + +create temporary view t1 as select * from values + (1, 1), (2, 1), (null, 1), + (1, 3), (null, 3), + (1, null), (null, 2) +as t1(a1, b1); + +create temporary view t2 as select * from values + (1, 1), + (null, 3), + (1, null) +as t2(a2, b2); + +-- multiple columns in NOT IN +-- TC 01.01 +select a1,b1 +from t1 +where (a1,b1) not in (select a2,b2 + from t2); + +-- multiple columns with expressions in NOT IN +-- TC 01.02 +select a1,b1 +from t1 +where (a1-1,b1) not in (select a2,b2 + from t2); + +-- multiple columns with expressions in NOT IN +-- TC 01.02 +select a1,b1 +from t1 +where (a1,b1) not in (select a2+1,b2 + from t2); + http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out new file mode 100644 index 0000000..756c378 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out @@ -0,0 +1,59 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 5 + + +-- !query 0 +create temporary view t1 as select * from values + (1, 1), (2, 1), (null, 1), + (1, 3), (null, 3), + (1, null), (null, 2) +as t1(a1, b1) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +create temporary view t2 as select * from values + (1, 1), + (null, 3), + (1, null) +as t2(a2, b2) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +select a1,b1 +from t1 +where (a1,b1) not in (select a2,b2 + from t2) +-- !query 2 schema +struct<a1:int,b1:int> +-- !query 2 output +2 1 + + +-- !query 3 +select a1,b1 +from t1 +where (a1-1,b1) not in (select a2,b2 + from t2) +-- !query 3 schema +struct<a1:int,b1:int> +-- !query 3 output +1 1 + + +-- !query 4 +select a1,b1 +from t1 +where (a1,b1) not in (select a2+1,b2 + from t2) +-- !query 4 schema +struct<a1:int,b1:int> +-- !query 4 output +1 1 http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 1a4049f..fdf940a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -163,7 +163,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { s"-- Number of queries: ${outputs.size}\n\n\n" + outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n" } - stringToFile(new File(testCase.resultFile), goldenOutput) + val resultFile = new File(testCase.resultFile); + val parent = resultFile.getParentFile(); + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) } // Read back the golden file. http://git-wip-us.apache.org/repos/asf/spark/blob/b94fb284/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 2ef8b18..25dbecb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -263,12 +263,12 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1, 2.0) :: Row(1, 2.0) :: Nil) checkAnswer( - sql("select * from l where a not in (select c from t where b < d)"), - Row(1, 2.0) :: Row(1, 2.0) :: Row(3, 3.0) :: Nil) + sql("select * from l where (a, b) not in (select c, d from t) and a < 4"), + Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) :: Row(3, 3.0) :: Nil) // Empty sub-query checkAnswer( - sql("select * from l where a not in (select c from r where c > 10 and b < d)"), + sql("select * from l where (a, b) not in (select c, d from r where c > 10)"), Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) :: Row(3, 3.0) :: Row(null, null) :: Row(null, 5.0) :: Row(6, null) :: Nil) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org