This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8f0c75cbbab [SPARK-43113][SQL][3.3] Evaluate stream-side variables
when generating code for a bound condition
8f0c75cbbab is described below
commit 8f0c75cbbab0cb76a30272a157b4f4cc02cab444
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Apr 24 09:58:15 2023 +0900
[SPARK-43113][SQL][3.3] Evaluate stream-side variables when generating code
for a bound condition
### What changes were proposed in this pull request?
This is a back-port of #40766 and #40881.
In `JoinCodegenSupport#getJoinCondition`, evaluate any referenced
stream-side variables before using them in the generated code.
This patch doesn't evaluate the passed stream-side variables directly, but
instead evaluates a copy (`streamVars2`). This is because
`SortMergeJoin#codegenFullOuter` will want to evaluate the stream-side vars
within a different scope than the condition check, so we mustn't delete the
initialization code from the original `ExprCode` instances.
### Why are the changes needed?
When a bound condition of a full outer join references the same stream-side
column more than once, wholestage codegen generates bad code.
For example, the following query fails with a compilation error:
```
create or replace temp view v1 as
select * from values
(1, 1),
(2, 2),
(3, 1)
as v1(key, value);
create or replace temp view v2 as
select * from values
(1, 22, 22),
(3, -1, -1),
(7, null, null)
as v2(a, b, c);
select *
from v1
full outer join v2
on key = a
and value > b
and value > c;
```
The error is:
```
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
277, Column 9: Redefinition of local variable "smj_isNull_7"
```
The same error occurs with code generated from ShuffleHashJoinExec:
```
select /*+ SHUFFLE_HASH(v2) */ *
from v1
full outer join v2
on key = a
and value > b
and value > c;
```
In this case, the error is:
```
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
174, Column 5: Redefinition of local variable "shj_value_1"
```
Neither `SortMergeJoin#codegenFullOuter` nor
`ShuffledHashJoinExec#doProduce` evaluate the stream-side variables before
calling `consumeFullOuterJoinRow#getJoinCondition`. As a result,
`getJoinCondition` generates definition/initialization code for each referenced
stream-side variable at the point of use. If a stream-side variable is used
more than once in the bound condition, the definition/initialization code is
generated more than once, resulting in the "Redefinition of local varia [...]
In the end, the query succeeds, since Spark disables wholestage codegen and
tries again.
(In the case other join-type/strategy pairs, either the implementations
don't call `JoinCodegenSupport#getJoinCondition`, or the stream-side variables
are pre-evaluated before the call is made, so no error happens in those cases).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests.
Closes #40917 from bersprockets/full_join_codegen_issue_br33.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/execution/joins/JoinCodegenSupport.scala | 12 ++++++--
.../scala/org/apache/spark/sql/JoinSuite.scala | 35 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
index 75f0a359a79..a7d1edefcd6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
@@ -42,13 +42,19 @@ trait JoinCodegenSupport extends CodegenSupport with
BaseJoinExec {
buildRow: Option[String] = None): (String, String, Seq[ExprCode]) = {
val buildSideRow = buildRow.getOrElse(ctx.freshName("buildRow"))
val buildVars = genOneSideJoinVars(ctx, buildSideRow, buildPlan,
setDefaultValue = false)
+ // We want to evaluate the passed streamVars. However, evaluation modifies
the contained
+ // ExprCode instances, which may surprise the caller to this method (in
particular,
+ // full outer join will want to evaluate streamVars in a different scope
than the
+ // condition check). Because of this, we first make a copy.
+ val streamVars2 = streamVars.map(_.copy())
val checkCondition = if (condition.isDefined) {
val expr = condition.get
- // evaluate the variables from build side that used by condition
- val eval = evaluateRequiredVariables(buildPlan.output, buildVars,
expr.references)
+ // evaluate the variables that are used by the condition
+ val eval = evaluateRequiredVariables(streamPlan.output ++
buildPlan.output,
+ streamVars2 ++ buildVars, expr.references)
// filter the output via condition
- ctx.currentVars = streamVars ++ buildVars
+ ctx.currentVars = streamVars2 ++ buildVars
val ev =
BindReferences.bindReference(expr, streamPlan.output ++
buildPlan.output).genCode(ctx)
val skipRow = s"${ev.isNull} || !${ev.value}"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 4a8421a2211..286fd5ab957 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1440,4 +1440,39 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
}
}
+
+ def dupStreamSideColTest(hint: String, check: SparkPlan => Unit): Unit = {
+ val query =
+ s"""select /*+ ${hint}(r) */ *
+ |from testData2 l
+ |full outer join testData3 r
+ |on l.a = r.a
+ |and l.b < (r.b + 1)
+ |and l.b < (r.a + 1)""".stripMargin
+ val df = sql(query)
+ val plan = df.queryExecution.executedPlan
+ check(plan)
+ val expected = Row(1, 1, null, null) ::
+ Row(1, 2, null, null) ::
+ Row(null, null, 1, null) ::
+ Row(2, 1, 2, 2) ::
+ Row(2, 2, 2, 2) ::
+ Row(3, 1, null, null) ::
+ Row(3, 2, null, null) :: Nil
+ checkAnswer(df, expected)
+ }
+
+ test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
+ def check(plan: SparkPlan): Unit = {
+ assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
+ }
+ dupStreamSideColTest("MERGE", check)
+ }
+
+ test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SHJ)") {
+ def check(plan: SparkPlan): Unit = {
+ assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
+ }
+ dupStreamSideColTest("SHUFFLE_HASH", check)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]