This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8f0c75cbbab [SPARK-43113][SQL][3.3] Evaluate stream-side variables 
when generating code for a bound condition
8f0c75cbbab is described below

commit 8f0c75cbbab0cb76a30272a157b4f4cc02cab444
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Apr 24 09:58:15 2023 +0900

    [SPARK-43113][SQL][3.3] Evaluate stream-side variables when generating code 
for a bound condition
    
    ### What changes were proposed in this pull request?
    
    This is a back-port of #40766 and #40881.
    
    In `JoinCodegenSupport#getJoinCondition`, evaluate any referenced 
stream-side variables before using them in the generated code.
    
    This patch doesn't evaluate the passed stream-side variables directly, but 
instead evaluates a copy (`streamVars2`). This is because 
`SortMergeJoin#codegenFullOuter` will want to evaluate the stream-side vars 
within a different scope than the condition check, so we mustn't delete the 
initialization code from the original `ExprCode` instances.
    
    ### Why are the changes needed?
    
    When a bound condition of a full outer join references the same stream-side 
column more than once, wholestage codegen generates bad code.
    
    For example, the following query fails with a compilation error:
    
    ```
    create or replace temp view v1 as
    select * from values
    (1, 1),
    (2, 2),
    (3, 1)
    as v1(key, value);
    
    create or replace temp view v2 as
    select * from values
    (1, 22, 22),
    (3, -1, -1),
    (7, null, null)
    as v2(a, b, c);
    
    select *
    from v1
    full outer join v2
    on key = a
    and value > b
    and value > c;
    ```
    The error is:
    ```
    org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
277, Column 9: Redefinition of local variable "smj_isNull_7"
    ```
    The same error occurs with code generated from ShuffleHashJoinExec:
    ```
    select /*+ SHUFFLE_HASH(v2) */ *
    from v1
    full outer join v2
    on key = a
    and value > b
    and value > c;
    ```
    In this case, the error is:
    ```
    org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
174, Column 5: Redefinition of local variable "shj_value_1"
    ```
    Neither `SortMergeJoin#codegenFullOuter` nor 
`ShuffledHashJoinExec#doProduce` evaluate the stream-side variables before 
calling `consumeFullOuterJoinRow#getJoinCondition`. As a result, 
`getJoinCondition` generates definition/initialization code for each referenced 
stream-side variable at the point of use. If a stream-side variable is used 
more than once in the bound condition, the definition/initialization code is 
generated more than once, resulting in the "Redefinition of local varia [...]
    
    In the end, the query succeeds, since Spark disables wholestage codegen and 
tries again.
    
    (In the case other join-type/strategy pairs, either the implementations 
don't call `JoinCodegenSupport#getJoinCondition`, or the stream-side variables 
are pre-evaluated before the call is made, so no error happens in those cases).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #40917 from bersprockets/full_join_codegen_issue_br33.
    
    Authored-by: Bruce Robbins <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/execution/joins/JoinCodegenSupport.scala   | 12 ++++++--
 .../scala/org/apache/spark/sql/JoinSuite.scala     | 35 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
index 75f0a359a79..a7d1edefcd6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
@@ -42,13 +42,19 @@ trait JoinCodegenSupport extends CodegenSupport with 
BaseJoinExec {
       buildRow: Option[String] = None): (String, String, Seq[ExprCode]) = {
     val buildSideRow = buildRow.getOrElse(ctx.freshName("buildRow"))
     val buildVars = genOneSideJoinVars(ctx, buildSideRow, buildPlan, 
setDefaultValue = false)
+    // We want to evaluate the passed streamVars. However, evaluation modifies 
the contained
+    // ExprCode instances, which may surprise the caller to this method (in 
particular,
+    // full outer join will want to evaluate streamVars in a different scope 
than the
+    // condition check). Because of this, we first make a copy.
+    val streamVars2 = streamVars.map(_.copy())
     val checkCondition = if (condition.isDefined) {
       val expr = condition.get
-      // evaluate the variables from build side that used by condition
-      val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
+      // evaluate the variables that are used by the condition
+      val eval = evaluateRequiredVariables(streamPlan.output ++ 
buildPlan.output,
+        streamVars2 ++ buildVars, expr.references)
 
       // filter the output via condition
-      ctx.currentVars = streamVars ++ buildVars
+      ctx.currentVars = streamVars2 ++ buildVars
       val ev =
         BindReferences.bindReference(expr, streamPlan.output ++ 
buildPlan.output).genCode(ctx)
       val skipRow = s"${ev.isNull} || !${ev.value}"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 4a8421a2211..286fd5ab957 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1440,4 +1440,39 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
       }
     }
   }
+
+  def dupStreamSideColTest(hint: String, check: SparkPlan => Unit): Unit = {
+    val query =
+      s"""select /*+ ${hint}(r) */ *
+         |from testData2 l
+         |full outer join testData3 r
+         |on l.a = r.a
+         |and l.b < (r.b + 1)
+         |and l.b < (r.a + 1)""".stripMargin
+    val df = sql(query)
+    val plan = df.queryExecution.executedPlan
+    check(plan)
+    val expected = Row(1, 1, null, null) ::
+      Row(1, 2, null, null) ::
+      Row(null, null, 1, null) ::
+      Row(2, 1, 2, 2) ::
+      Row(2, 2, 2, 2) ::
+      Row(3, 1, null, null) ::
+      Row(3, 2, null, null) :: Nil
+    checkAnswer(df, expected)
+  }
+
+  test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
+    def check(plan: SparkPlan): Unit = {
+      assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
+    }
+    dupStreamSideColTest("MERGE", check)
+  }
+
+  test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SHJ)") {
+    def check(plan: SparkPlan): Unit = {
+      assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
+    }
+    dupStreamSideColTest("SHUFFLE_HASH", check)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to