Repository: spark Updated Branches: refs/heads/branch-2.0 7feb79085 -> acf56c5db
[SPARK-15327] [SQL] fix split expression in whole stage codegen ## What changes were proposed in this pull request? Right now, we will split the code for expressions into multiple functions when it exceed 64k, which requires that the the expressions are using Row object, but this is not true for whole-state codegen, it will fail to compile after splitted. This PR will not split the code in whole-stage codegen. ## How was this patch tested? Added regression tests. Author: Davies Liu <[email protected]> Closes #13235 from davies/fix_nested_codegen. (cherry picked from commit 2df6ca848e99b90acd11c3a3de342fa4d77015d6) Signed-off-by: Davies Liu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acf56c5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acf56c5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acf56c5d Branch: refs/heads/branch-2.0 Commit: acf56c5db803f327a341b524d9fbf178eb3dc711 Parents: 7feb790 Author: Davies Liu <[email protected]> Authored: Tue May 31 15:36:02 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Tue May 31 15:36:15 2016 -0700 ---------------------------------------------------------------------- .../expressions/codegen/CodeGenerator.scala | 4 ++++ .../sql/execution/WholeStageCodegenExec.scala | 1 + .../execution/aggregate/TungstenAggregate.scala | 2 ++ .../org/apache/spark/sql/SQLQuerySuite.scala | 24 ++++++++++++++++++++ 4 files changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 86883d7..93e477e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -560,6 +560,10 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { + if (row == null) { + // Cannot split these expressions because they are not created from a row object. + return expressions.mkString("\n") + } val blocks = new ArrayBuffer[String]() val blockBuilder = new StringBuilder() for (code <- expressions) { http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 2aec931..cd9ba7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -130,6 +130,7 @@ trait CodegenSupport extends SparkPlan { } val evaluateInputs = evaluateVariables(outputVars) // generate the code to create a UnsafeRow + ctx.INPUT_ROW = row ctx.currentVars = outputVars val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) val code = s""" http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index d2dc80a..905e93c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -599,6 +599,8 @@ case class TungstenAggregate( // create grouping key ctx.currentVars = input + // make sure that the generated code will not be splitted as multiple functions + ctx.INPUT_ROW = null val unsafeRowKeyCode = GenerateUnsafeProjection.createCode( ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output))) val vectorizedRowKeys = ctx.generateExpressions( http://git-wip-us.apache.org/repos/asf/spark/blob/acf56c5d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1ddb586..91d9302 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2483,6 +2483,30 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-15327: fail to compile generated code with complex data structure") { + withTempDir{ dir => + val json = + """ + |{"h": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 3}], + |"b": [{"e": "test", "count": 1}]}}, "d": {"b": {"c": [{"e": "adfgd"}], + |"a": [{"e": "testing", "count": 3}], "b": [{"e": "test", "count": 1}]}}, + |"c": {"b": {"c": [{"e": "adfgd"}], "a": [{"count": 3}], + |"b": [{"e": "test", "count": 1}]}}, "a": {"b": {"c": [{"e": "adfgd"}], + |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}, + |"e": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 3}], + |"b": [{"e": "test", "count": 1}]}}, "g": {"b": {"c": [{"e": "adfgd"}], + |"a": [{"e": "testing", "count": 3}], "b": [{"e": "test", "count": 1}]}}, + |"f": {"b": {"c": [{"e": "adfgd"}], "a": [{"e": "testing", "count": 3}], + |"b": [{"e": "test", "count": 1}]}}, "b": {"b": {"c": [{"e": "adfgd"}], + |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}' + | + """.stripMargin + val rdd = sparkContext.parallelize(Array(json)) + spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString) + spark.read.parquet(dir.toString).collect() + } + } + test("SPARK-14986: Outer lateral view with empty generate expression") { checkAnswer( sql("select nil from (select 1 as x ) x lateral view outer explode(array()) n as nil"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
