This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a75e12f96a1 [SPARK-51228][SQL] Introduce subquery normalization to 
NormalizePlan
8a75e12f96a1 is described below

commit 8a75e12f96a1129da3dfcfbaf62b5992092baa14
Author: Vladimir Golubev <[email protected]>
AuthorDate: Mon Feb 17 19:11:38 2025 +0800

    [SPARK-51228][SQL] Introduce subquery normalization to NormalizePlan
    
    ### What changes were proposed in this pull request?
    
    Introduce subquery normalization to NormalizePlan.
    
    Also, perform bottom-up normalization for plans - predicate children have 
to be normalized first, because they are reordered by `hashCode()` value.
    
    ### Why are the changes needed?
    
    This is important for single-pass Analyzer to correctly compare plans with 
In/Exists/Scalar subqueries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49970 from vladimirg-db/vladimirg-db/normalize-subqueries-as-well.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/catalyst/plans/NormalizePlan.scala      | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index a13650da2472..62ef65eb1112 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -40,7 +40,10 @@ object NormalizePlan extends PredicateHelper {
    */
   def normalizeExpressions(plan: LogicalPlan): LogicalPlan = {
     val withNormalizedRuntimeReplaceable = normalizeRuntimeReplaceable(plan)
-    withNormalizedRuntimeReplaceable transformAllExpressions {
+    withNormalizedRuntimeReplaceable.transformAllExpressions {
+      case subqueryExpression: SubqueryExpression =>
+        val normalizedPlan = normalizeExpressions(subqueryExpression.plan)
+        subqueryExpression.withNewPlan(normalizedPlan)
       case commonExpressionDef: CommonExpressionDef =>
         commonExpressionDef.copy(id = new CommonExpressionId(id = 0))
       case commonExpressionRef: CommonExpressionRef =>
@@ -73,7 +76,7 @@ object NormalizePlan extends PredicateHelper {
    * we must normalize them to check if two different queries are identical.
    */
   def normalizeExprIds(plan: LogicalPlan): LogicalPlan = {
-    plan transformAllExpressions {
+    plan.transformAllExpressions {
       case s: ScalarSubquery =>
         s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0))
       case s: LateralSubquery =>
@@ -117,7 +120,7 @@ object NormalizePlan extends PredicateHelper {
    */
   def normalizePlan(plan: LogicalPlan): LogicalPlan = {
     val cteIdNormalizer = new CteIdNormalizer
-    plan transform {
+    plan.transformUpWithSubqueries {
       case Filter(condition: Expression, child: LogicalPlan) =>
         Filter(
           splitConjunctivePredicates(condition)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to