Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dc61ed406 -> bde1d4133


[SPARK-18634][PYSPARK][SQL] Corruption and Correctness issues with exploding 
Python UDFs

## What changes were proposed in this pull request?

As reported in the Jira, there are some weird issues with exploding Python UDFs 
in SparkSQL.

The following test code can reproduce it. Notice: the following test code is 
reported to return wrong results in the Jira. However, as I tested on master 
branch, it causes exception and so can't return any result.

    >>> from pyspark.sql.functions import *
    >>> from pyspark.sql.types import *
    >>>
    >>> df = spark.range(10)
    >>>
    >>> def return_range(value):
    ...   return [(i, str(i)) for i in range(value - 1, value + 1)]
    ...
    >>> range_udf = udf(return_range, 
ArrayType(StructType([StructField("integer_val", IntegerType()),
    ...                                                     
StructField("string_val", StringType())])))
    >>>
    >>> df.select("id", explode(range_udf(df.id))).show()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
        print(self._jdf.showString(n, 20))
      File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 
1133, in __call__
      File "/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, 
in get_return_value py4j.protocol.Py4JJavaError: An error occurred while 
calling o126.showString.: java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
        at 
org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)

The cause of this issue is, in `ExtractPythonUDFs` we insert 
`BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will 
add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the 
original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added 
`BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.

Because the output of `GenerateExec` is given after analysis phase, in above 
case, it is the combination of `id`, i.e., the output of `Range`, and `col`. 
But in planning phase, we change `GenerateExec`'s child plan to 
`BatchEvalPythonExec` with additional output attributes.

It will cause no problem in non wholestage codegen. Because when evaluating the 
additional attributes are projected out the final output of `GenerateExec`.

However, as `GenerateExec` now supports wholestage codegen, the framework will 
input all the outputs of the child plan to `GenerateExec`. Then when consuming 
`GenerateExec`'s output data (i.e., calling `consume`), the number of output 
attributes is different to the output variables in wholestage codegen.

To solve this issue, this patch only gives the generator's output to 
`GenerateExec` after analysis phase. `GenerateExec`'s output is the combination 
of its child plan's output and the generator's output. So when we change 
`GenerateExec`'s child, its output is still correct.

## How was this patch tested?

Added test cases to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <[email protected]>

Closes #16120 from viirya/fix-py-udf-with-generator.

(cherry picked from commit 3ba69b64852ccbf6d4ec05a021bc20616a09f574)
Signed-off-by: Herman van Hovell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bde1d413
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bde1d413
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bde1d413

Branch: refs/heads/branch-2.0
Commit: bde1d4133b2445935d19b57da38e6eee11e6d90c
Parents: dc61ed4
Author: Liang-Chi Hsieh <[email protected]>
Authored: Mon Dec 5 17:50:43 2016 -0800
Committer: Herman van Hovell <[email protected]>
Committed: Mon Dec 5 17:51:09 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                     | 20 ++++++++++++++++++++
 .../plans/logical/basicLogicalOperators.scala   | 12 ++++++------
 .../spark/sql/execution/GenerateExec.scala      | 15 ++++++++++++---
 .../spark/sql/execution/SparkStrategies.scala   |  3 ++-
 4 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d413/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a71457a..b3cf72b 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -379,6 +379,26 @@ class SQLTests(ReusedPySparkTestCase):
         row = df.select(explode(f(*df))).groupBy().sum().first()
         self.assertEqual(row[0], 10)
 
+        df = self.spark.range(3)
+        res = df.select("id", explode(f(df.id))).collect()
+        self.assertEqual(res[0][0], 1)
+        self.assertEqual(res[0][1], 0)
+        self.assertEqual(res[1][0], 2)
+        self.assertEqual(res[1][1], 0)
+        self.assertEqual(res[2][0], 2)
+        self.assertEqual(res[2][1], 1)
+
+        range_udf = udf(lambda value: list(range(value - 1, value + 1)), 
ArrayType(IntegerType()))
+        res = df.select("id", explode(range_udf(df.id))).collect()
+        self.assertEqual(res[0][0], 0)
+        self.assertEqual(res[0][1], -1)
+        self.assertEqual(res[1][0], 0)
+        self.assertEqual(res[1][1], 0)
+        self.assertEqual(res[2][0], 1)
+        self.assertEqual(res[2][1], 0)
+        self.assertEqual(res[3][0], 1)
+        self.assertEqual(res[3][1], 1)
+
     def test_udf_with_order_by_and_limit(self):
         from pyspark.sql.functions import udf
         my_copy = udf(lambda x: x, IntegerType())

http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d413/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 2fdaa05..355e8e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -91,13 +91,13 @@ case class Generate(
 
   override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
 
-  def output: Seq[Attribute] = {
-    val qualified = qualifier.map(q =>
-      // prepend the new qualifier to the existed one
-      generatorOutput.map(a => a.withQualifier(Some(q)))
-    ).getOrElse(generatorOutput)
+  val qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q =>
+    // prepend the new qualifier to the existed one
+    generatorOutput.map(a => a.withQualifier(Some(q)))
+  }.getOrElse(generatorOutput)
 
-    if (join) child.output ++ qualified else qualified
+  def output: Seq[Attribute] = {
+    if (join) child.output ++ qualifiedGeneratorOutput else 
qualifiedGeneratorOutput
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d413/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 39189a2..2598b59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -44,17 +44,26 @@ private[execution] sealed case class LazyIterator(func: () 
=> TraversableOnce[In
  *              it.
  * @param outer when true, each input row will be output at least once, even 
if the output of the
  *              given `generator` is empty. `outer` has no effect when `join` 
is false.
- * @param output the output attributes of this node, which constructed in 
analysis phase,
- *               and we can not change it, as the parent node bound with it 
already.
+ * @param generatorOutput the qualified output attributes of the generator of 
this node, which
+ *                        constructed in analysis phase, and we can not change 
it, as the
+ *                        parent node bound with it already.
  */
 case class GenerateExec(
     generator: Generator,
     join: Boolean,
     outer: Boolean,
-    output: Seq[Attribute],
+    generatorOutput: Seq[Attribute],
     child: SparkPlan)
   extends UnaryExecNode {
 
+  override def output: Seq[Attribute] = {
+    if (join) {
+      child.output ++ generatorOutput
+    } else {
+      generatorOutput
+    }
+  }
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bde1d413/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ccfb954..dd611a8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -399,7 +399,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.UnionExec(unionChildren.map(planLater)) :: Nil
       case g @ logical.Generate(generator, join, outer, _, _, child) =>
         execution.GenerateExec(
-          generator, join = join, outer = outer, g.output, planLater(child)) 
:: Nil
+          generator, join = join, outer = outer, g.qualifiedGeneratorOutput,
+          planLater(child)) :: Nil
       case logical.OneRowRelation =>
         execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
       case r : logical.Range =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to