Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7feb79085 -> acf56c5db


[SPARK-15327] [SQL] fix split expression in whole stage codegen

## What changes were proposed in this pull request?

Right now, we will split the code for expressions into multiple functions when 
it exceed 64k, which requires that the the expressions are using Row object, 
but this is not true for whole-state codegen, it will fail to compile after 
splitted.

This PR will not split the code in whole-stage codegen.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <[email protected]>

Closes #13235 from davies/fix_nested_codegen.

(cherry picked from commit 2df6ca848e99b90acd11c3a3de342fa4d77015d6)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acf56c5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acf56c5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acf56c5d

Branch: refs/heads/branch-2.0
Commit: acf56c5db803f327a341b524d9fbf178eb3dc711
Parents: 7feb790
Author: Davies Liu <[email protected]>
Authored: Tue May 31 15:36:02 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Tue May 31 15:36:15 2016 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     |  4 ++++
 .../sql/execution/WholeStageCodegenExec.scala   |  1 +
 .../execution/aggregate/TungstenAggregate.scala |  2 ++
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 24 ++++++++++++++++++++
 4 files changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 86883d7..93e477e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -560,6 +560,10 @@ class CodegenContext {
    * @param expressions the codes to evaluate expressions.
    */
   def splitExpressions(row: String, expressions: Seq[String]): String = {
+    if (row == null) {
+      // Cannot split these expressions because they are not created from a 
row object.
+      return expressions.mkString("\n")
+    }
     val blocks = new ArrayBuffer[String]()
     val blockBuilder = new StringBuilder()
     for (code <- expressions) {

http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 2aec931..cd9ba7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -130,6 +130,7 @@ trait CodegenSupport extends SparkPlan {
         }
         val evaluateInputs = evaluateVariables(outputVars)
         // generate the code to create a UnsafeRow
+        ctx.INPUT_ROW = row
         ctx.currentVars = outputVars
         val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
         val code = s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index d2dc80a..905e93c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -599,6 +599,8 @@ case class TungstenAggregate(
 
     // create grouping key
     ctx.currentVars = input
+    // make sure that the generated code will not be splitted as multiple 
functions
+    ctx.INPUT_ROW = null
     val unsafeRowKeyCode = GenerateUnsafeProjection.createCode(
       ctx, groupingExpressions.map(e => 
BindReferences.bindReference[Expression](e, child.output)))
     val vectorizedRowKeys = ctx.generateExpressions(

http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1ddb586..91d9302 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2483,6 +2483,30 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
+  test("SPARK-15327: fail to compile generated code with complex data 
structure") {
+    withTempDir{ dir =>
+      val json =
+        """
+          |{"h": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 
3}],
+          |"b": [{"e": "test", "count": 1}]}}, "d": {"b": {"c": [{"e": 
"adfgd"}],
+          |"a": [{"e": "testing", "count": 3}], "b": [{"e": "test", "count": 
1}]}},
+          |"c": {"b": {"c": [{"e": "adfgd"}], "a": [{"count": 3}],
+          |"b": [{"e": "test", "count": 1}]}}, "a": {"b": {"c": [{"e": 
"adfgd"}],
+          |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}},
+          |"e": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 
3}],
+          |"b": [{"e": "test", "count": 1}]}}, "g": {"b": {"c": [{"e": 
"adfgd"}],
+          |"a": [{"e": "testing", "count": 3}], "b": [{"e": "test", "count": 
1}]}},
+          |"f": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 
3}],
+          |"b": [{"e": "test", "count": 1}]}}, "b": {"b": {"c": [{"e": 
"adfgd"}],
+          |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}'
+          |
+        """.stripMargin
+      val rdd = sparkContext.parallelize(Array(json))
+      spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString)
+      spark.read.parquet(dir.toString).collect()
+    }
+  }
+
   test("SPARK-14986: Outer lateral view with empty generate expression") {
     checkAnswer(
       sql("select nil from (select 1 as x ) x lateral view outer 
explode(array()) n as nil"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to