Repository: spark
Updated Branches:
refs/heads/branch-2.3 a0a7e41cf -> b9b35b959
[SPARK-25084][SQL][BACKPORT-2.3] distribute by" on multiple columns (wrap in
brackets) may lead to codegen issue
## What changes were proposed in this pull request?
Backport #22066 to branch-2.3
Use different API in 2.3 here
```scala
|${ctx.JAVA_INT} $childResult = 0;
```
"distribute by" on multiple columns (wrap in brackets) may lead to codegen
issue.
Simple way to reproduce:
```scala
val df = spark.range(1000)
val columns = (0 until 400).map{ i => s"id as id$i" }
val distributeExprs = (0 until 100).map(c => s"id$c").mkString(",")
df.selectExpr(columns : _*).createTempView("test")
spark.sql(s"select * from test distribute by ($distributeExprs)").count()
```
## How was this patch tested?
UT in Jenkins
Closes #22077 from LantaoJin/SPARK-25084_2.3.
Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9b35b95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9b35b95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9b35b95
Branch: refs/heads/branch-2.3
Commit: b9b35b9598dde5d8b2e47b40a0ca310527f31763
Parents: a0a7e41
Author: LantaoJin <[email protected]>
Authored: Mon Aug 13 08:51:12 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Aug 13 08:51:12 2018 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/expressions/hash.scala | 23 +++++++++++++++-----
.../org/apache/spark/sql/SQLQuerySuite.scala | 12 ++++++++++
2 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b9b35b95/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 055ebf6..5aa0325 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -404,14 +404,15 @@ abstract class HashExpression[E] extends Expression {
input: String,
result: String,
fields: Array[StructField]): String = {
+ val tmpInput = ctx.freshName("input")
val fieldsHash = fields.zipWithIndex.map { case (field, index) =>
- nullSafeElementHash(input, index.toString, field.nullable,
field.dataType, result, ctx)
+ nullSafeElementHash(tmpInput, index.toString, field.nullable,
field.dataType, result, ctx)
}
val hashResultType = ctx.javaType(dataType)
- ctx.splitExpressions(
+ val code = ctx.splitExpressions(
expressions = fieldsHash,
funcName = "computeHashForStruct",
- arguments = Seq("InternalRow" -> input, hashResultType -> result),
+ arguments = Seq("InternalRow" -> tmpInput, hashResultType -> result),
returnType = hashResultType,
makeSplitFunction = body =>
s"""
@@ -419,6 +420,10 @@ abstract class HashExpression[E] extends Expression {
|return $result;
""".stripMargin,
foldFunctions = _.map(funcCall => s"$result =
$funcCall;").mkString("\n"))
+ s"""
+ |final InternalRow $tmpInput = $input;
+ |$code
+ """.stripMargin
}
@tailrec
@@ -769,10 +774,11 @@ case class HiveHash(children: Seq[Expression]) extends
HashExpression[Int] {
input: String,
result: String,
fields: Array[StructField]): String = {
+ val tmpInput = ctx.freshName("input")
val childResult = ctx.freshName("childResult")
val fieldsHash = fields.zipWithIndex.map { case (field, index) =>
val computeFieldHash = nullSafeElementHash(
- input, index.toString, field.nullable, field.dataType, childResult,
ctx)
+ tmpInput, index.toString, field.nullable, field.dataType, childResult,
ctx)
s"""
|$childResult = 0;
|$computeFieldHash
@@ -780,10 +786,10 @@ case class HiveHash(children: Seq[Expression]) extends
HashExpression[Int] {
""".stripMargin
}
- s"${ctx.JAVA_INT} $childResult = 0;\n" + ctx.splitExpressions(
+ val code = ctx.splitExpressions(
expressions = fieldsHash,
funcName = "computeHashForStruct",
- arguments = Seq("InternalRow" -> input, ctx.JAVA_INT -> result),
+ arguments = Seq("InternalRow" -> tmpInput, ctx.JAVA_INT -> result),
returnType = ctx.JAVA_INT,
makeSplitFunction = body =>
s"""
@@ -792,6 +798,11 @@ case class HiveHash(children: Seq[Expression]) extends
HashExpression[Int] {
|return $result;
""".stripMargin,
foldFunctions = _.map(funcCall => s"$result =
$funcCall;").mkString("\n"))
+ s"""
+ |final InternalRow $tmpInput = $input;
+ |${ctx.JAVA_INT} $childResult = 0;
+ |$code
+ """.stripMargin
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b9b35b95/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5aabc98..8011348 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2806,4 +2806,16 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
checkAnswer(df, Seq(Row(3, 99, 1)))
}
}
+
+ test("SPARK-25084: 'distribute by' on multiple columns may lead to codegen
issue") {
+ withView("spark_25084") {
+ val count = 1000
+ val df = spark.range(count)
+ val columns = (0 until 400).map{ i => s"id as id$i" }
+ val distributeExprs = (0 until 100).map(c => s"id$c").mkString(",")
+ df.selectExpr(columns : _*).createTempView("spark_25084")
+ assert(
+ spark.sql(s"select * from spark_25084 distribute by
($distributeExprs)").count === count)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]