This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 3061b5007c76 [SPARK-46640][FOLLOW-UP] Consider the whole expression tree when excluding subquery references 3061b5007c76 is described below commit 3061b5007c768882fa401f2f3c1c3860352ad17b Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com> AuthorDate: Tue Apr 15 10:08:14 2025 +0800 [SPARK-46640][FOLLOW-UP] Consider the whole expression tree when excluding subquery references ### What changes were proposed in this pull request? - Context: In `RemoveRedundantAliases`, we exclude `outerAttrs` referenced by subquery expressions when trying to identify aliases that can be removed. Failure to do so could lead to scenarios where certain part of the plans (for example: the join conditions in exists subquery) would end up with duplicated expression ID(s). A fix for this was merged here in the PR here - https://github.com/apache/spark/pull/44645. - The fix there only accounted for the `plan.expressions` to be a SubqueryExpression. - We can have scenarios where the `SubqueryExpression` is wrapped in other expressions -- for example, `Alias < CaseWhen < (SubqueryExpression)`. - To correctly account for these, need to traverse the whole expression tree and collect outer references from these. - Consider plans of the form: ``` Project [CASE WHEN exists#2 [a#1 && (a#1 = a#0)] THEN 1 ELSE 2 END AS result#3] : +- LocalRelation <empty>, [a#0] +- Project [a#0 AS a#1] +- LocalRelation <empty>, [a#0] ``` - The current rule would have rewritten this as follows. Note the join condition with conflicting expression ID(s) `a#0 = a#0` ``` Project [CASE WHEN exists#2 [a#0 && (a#0 = a#0)] THEN 1 ELSE 2 END AS result#3] : +- LocalRelation <empty>, [a#0] +- LocalRelation <empty>, [a#0] ``` ### Why are the changes needed? - Queries that would have run successfully with this rule disabled would fail. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Added unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50570 from nikhilsheoran-db/SPARK-46640-follow-up. Lead-authored-by: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com> Co-authored-by: Nikhil Sheoran <nikhilsheoran...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 898a0b46a0ab40bf201307a5d25f6028775740f2) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 ++- .../RemoveRedundantAliasAndProjectSuite.scala | 60 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9d269f37e58b..aa972c815591 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -671,8 +671,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { val subQueryAttributes = if (conf.getConf(SQLConf .EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES)) { // Collect the references for all the subquery expressions in the plan. - AttributeSet.fromAttributeSets(plan.expressions.collect { - case e: SubqueryExpression => e.references + AttributeSet.fromAttributeSets(plan.expressions.flatMap { e => + e.collect { + case s: SubqueryExpression => s.references + } }) } else { AttributeSet.empty diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 8a0a0466ca74..552a638f6e61 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -178,4 +178,64 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest { comparePlans(optimized, expectedWhenNotExcluded) } } + + test("SPARK-46640: exclude outer references accounts for children of plan expression") { + val a = $"a".int + val a_alias = Alias(a, "a")() + val a_alias_attr = a_alias.toAttribute + + // The original input query + // Project [CASE WHEN exists#2 [a#1 && (a#1 = a#0)] THEN 1 ELSE 2 END AS result#3] + // : +- LocalRelation <empty>, [a#0] + // +- Project [a#0 AS a#1] + // +- LocalRelation <empty>, [a#0] + // The subquery expression (`exists#2`) is wrapped in a CaseWhen and an Alias. + // Without the fix on excluding outer references, the rewritten plan would have been: + // Project [CASE WHEN exists#2 [a#0 && (a#0 = a#0)] THEN 1 ELSE 2 END AS result#3] + // : +- LocalRelation <empty>, [a#0] + // +- LocalRelation <empty>, [a#0] + // This plan would then fail later with the error -- conflicting a#0 in join condition. + + val query = Project(Seq( + Alias( + CaseWhen(Seq(( + Exists( + LocalRelation(a), + outerAttrs = Seq(a_alias_attr), + joinCond = Seq(EqualTo(a_alias_attr, a)) + ), Literal(1))), + Some(Literal(2))), + "result" + )()), + Project(Seq(a_alias), LocalRelation(a)) + ) + + // The alias would not be removed if excluding subquery references is enabled. + val expectedWhenExcluded = query + + // The alias would be removed and we would have conflicting expression ID(s) in the join cond + val expectedWhenNotEnabled = Project(Seq( + Alias( + CaseWhen(Seq(( + Exists( + LocalRelation(a), + outerAttrs = Seq(a), + joinCond = Seq(EqualTo(a, a)) + ), Literal(1))), + Some(Literal(2))), + "result" + )()), + LocalRelation(a) + ) + + withSQLConf(SQLConf.EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES.key -> "true") { + val optimized = Optimize.execute(query) + comparePlans(optimized, expectedWhenExcluded) + } + + withSQLConf(SQLConf.EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES.key -> "false") { + val optimized = Optimize.execute(query) + comparePlans(optimized, expectedWhenNotEnabled) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org