This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3061b5007c76 [SPARK-46640][FOLLOW-UP] Consider the whole expression 
tree when excluding subquery references
3061b5007c76 is described below

commit 3061b5007c768882fa401f2f3c1c3860352ad17b
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Tue Apr 15 10:08:14 2025 +0800

    [SPARK-46640][FOLLOW-UP] Consider the whole expression tree when excluding 
subquery references
    
    ### What changes were proposed in this pull request?
    
    - Context: In `RemoveRedundantAliases`, we exclude `outerAttrs` referenced 
by subquery expressions when trying to identify aliases that can be removed. 
Failure to do so could lead to scenarios where certain part of the plans (for 
example: the join conditions in exists subquery) would end up with duplicated 
expression ID(s). A fix for this was merged here in the PR here - 
https://github.com/apache/spark/pull/44645.
    - The fix there only accounted for the `plan.expressions` to be a 
SubqueryExpression.
    - We can have scenarios where the `SubqueryExpression` is wrapped in other 
expressions -- for example, `Alias < CaseWhen < (SubqueryExpression)`.
    - To correctly account for these, need to traverse the whole expression 
tree and collect outer references from these.
    
    - Consider plans of the form:
    
    ```
    Project [CASE WHEN exists#2 [a#1 && (a#1 = a#0)] THEN 1 ELSE 2 END AS 
result#3]
    :  +- LocalRelation <empty>, [a#0]
    +- Project [a#0 AS a#1]
      +- LocalRelation <empty>, [a#0]
    ```
    
    - The current rule would have rewritten this as follows. Note the join 
condition with conflicting expression ID(s) `a#0 = a#0`
    
    ```
    Project [CASE WHEN exists#2 [a#0 && (a#0 = a#0)] THEN 1 ELSE 2 END AS 
result#3]
    :  +- LocalRelation <empty>, [a#0]
    +- LocalRelation <empty>, [a#0]
    ```
    
    ### Why are the changes needed?
    - Queries that would have run successfully with this rule disabled would 
fail.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Added unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50570 from nikhilsheoran-db/SPARK-46640-follow-up.
    
    Lead-authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
    Co-authored-by: Nikhil Sheoran <nikhilsheoran...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 898a0b46a0ab40bf201307a5d25f6028775740f2)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  6 ++-
 .../RemoveRedundantAliasAndProjectSuite.scala      | 60 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9d269f37e58b..aa972c815591 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -671,8 +671,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
         val subQueryAttributes = if (conf.getConf(SQLConf
           .EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES)) {
           // Collect the references for all the subquery expressions in the 
plan.
-          AttributeSet.fromAttributeSets(plan.expressions.collect {
-            case e: SubqueryExpression => e.references
+          AttributeSet.fromAttributeSets(plan.expressions.flatMap { e =>
+            e.collect {
+              case s: SubqueryExpression => s.references
+            }
           })
         } else {
           AttributeSet.empty
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index 8a0a0466ca74..552a638f6e61 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -178,4 +178,64 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest 
{
       comparePlans(optimized, expectedWhenNotExcluded)
     }
   }
+
+  test("SPARK-46640: exclude outer references accounts for children of plan 
expression") {
+    val a = $"a".int
+    val a_alias = Alias(a, "a")()
+    val a_alias_attr = a_alias.toAttribute
+
+    // The original input query
+    //  Project [CASE WHEN exists#2 [a#1 && (a#1 = a#0)] THEN 1 ELSE 2 END AS 
result#3]
+    //  :  +- LocalRelation <empty>, [a#0]
+    //  +- Project [a#0 AS a#1]
+    //    +- LocalRelation <empty>, [a#0]
+    // The subquery expression (`exists#2`) is wrapped in a CaseWhen and an 
Alias.
+    // Without the fix on excluding outer references, the rewritten plan would 
have been:
+    //  Project [CASE WHEN exists#2 [a#0 && (a#0 = a#0)] THEN 1 ELSE 2 END AS 
result#3]
+    //  :  +- LocalRelation <empty>, [a#0]
+    //  +- LocalRelation <empty>, [a#0]
+    // This plan would then fail later with the error -- conflicting a#0 in 
join condition.
+
+    val query = Project(Seq(
+      Alias(
+        CaseWhen(Seq((
+          Exists(
+            LocalRelation(a),
+            outerAttrs = Seq(a_alias_attr),
+            joinCond = Seq(EqualTo(a_alias_attr, a))
+          ), Literal(1))),
+          Some(Literal(2))),
+        "result"
+      )()),
+      Project(Seq(a_alias), LocalRelation(a))
+    )
+
+    // The alias would not be removed if excluding subquery references is 
enabled.
+    val expectedWhenExcluded = query
+
+    // The alias would be removed and we would have conflicting expression 
ID(s) in the join cond
+    val expectedWhenNotEnabled = Project(Seq(
+      Alias(
+        CaseWhen(Seq((
+          Exists(
+            LocalRelation(a),
+            outerAttrs = Seq(a),
+            joinCond = Seq(EqualTo(a, a))
+          ), Literal(1))),
+          Some(Literal(2))),
+        "result"
+      )()),
+      LocalRelation(a)
+    )
+
+    
withSQLConf(SQLConf.EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES.key 
-> "true") {
+      val optimized = Optimize.execute(query)
+      comparePlans(optimized, expectedWhenExcluded)
+    }
+
+    
withSQLConf(SQLConf.EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES.key 
-> "false") {
+      val optimized = Optimize.execute(query)
+      comparePlans(optimized, expectedWhenNotEnabled)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to