Repository: spark Updated Branches: refs/heads/master e06320626 -> d8104158a
[SPARK-17100] [SQL] fix Python udf in filter on top of outer join ## What changes were proposed in this pull request? In optimizer, we try to evaluate the condition to see whether it's nullable or not, but some expressions are not evaluable, we should check that before evaluate it. ## How was this patch tested? Added regression tests. Author: Davies Liu <[email protected]> Closes #15103 from davies/udf_join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8104158 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8104158 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8104158 Branch: refs/heads/master Commit: d8104158a922d86dd4f00e50d5d7dddc7b777a21 Parents: e063206 Author: Davies Liu <[email protected]> Authored: Mon Sep 19 13:24:16 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Mon Sep 19 13:24:16 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/tests.py | 8 ++++++++ .../org/apache/spark/sql/catalyst/optimizer/joins.scala | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8104158/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1be0b72..c2171c2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -328,6 +328,14 @@ class SQLTests(ReusedPySparkTestCase): [row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 1)").collect() self.assertEqual(tuple(row), (6, 5)) + def test_udf_in_filter_on_top_of_outer_join(self): + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1)]) + right = self.spark.createDataFrame([Row(a=1)]) + df = left.join(right, on='a', how='left_outer') + df = df.withColumn('b', udf(lambda x: 'x')(df.a)) + self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')]) + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() http://git-wip-us.apache.org/repos/asf/spark/blob/d8104158/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 1621bff..2626057 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -109,7 +109,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false val attributes = e.references.toSeq val emptyRow = new GenericInternalRow(attributes.length) - val v = BindReferences.bindReference(e, attributes).eval(emptyRow) + val boundE = BindReferences.bindReference(e, attributes) + if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false + val v = boundE.eval(emptyRow) v == null || v == false } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
