This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 09cb059c8dde [SPARK-50798][SQL][FOLLOWUP] Further improvements to 
`NormalizePlan`
09cb059c8dde is described below

commit 09cb059c8dde9765d7ac6e4f1b44e2f47b3c1338
Author: Mihailo Timotic <[email protected]>
AuthorDate: Wed Jan 22 10:22:11 2025 +0800

    [SPARK-50798][SQL][FOLLOWUP] Further improvements to `NormalizePlan`
    
    ### What changes were proposed in this pull request?
    Improve `NormalizePlan` by fixing normalization of `InheritAnalysisRules` 
and add normalization for `CommonExpressionId` and expressions that use it.
    
    ### Why are the changes needed?
    
    PR #49460 mishandled normalization of `RuntimeReplaceable` because it 
copied original expression's tags to the replacement. This PR fixes that.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a test case to `NormalizePlanSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49585 from mihailotim-db/mihailotim-db/normalize_plan_followup.
    
    Authored-by: Mihailo Timotic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/plans/NormalizePlan.scala   | 34 +++++++++++++++-------
 .../sql/catalyst/plans/NormalizePlanSuite.scala    | 32 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 13df749c6d58..b98cef04d911 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -22,17 +22,34 @@ import java.util.HashMap
 import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
 import org.apache.spark.sql.catalyst.plans.logical._
 
 object NormalizePlan extends PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val withNormalizedInheritAnalysis = normalizeInheritAnalysisRules(plan)
-    val withNormalizedExprIds = normalizeExprIds(withNormalizedInheritAnalysis)
+    val withNormalizedExpressions = normalizeExpressions(plan)
+    val withNormalizedExprIds = normalizeExprIds(withNormalizedExpressions)
     normalizePlan(withNormalizedExprIds)
   }
 
   /**
-   * Normalize [[InheritAnalysisRules]] nodes by replacing them with their 
replacement expressions.
+   * Normalizes expressions in a plan, that either produces non-deterministic 
results or
+   * will be different between fixed-point and single-pass analyzer, due to 
the nature
+   * of bottom-up resolution. Before normalization, pre-process the plan by 
replacing all
+   * [[RuntimeReplaceable]] nodes with their replacements.
+   */
+  def normalizeExpressions(plan: LogicalPlan): LogicalPlan = {
+    val withNormalizedRuntimeReplaceable = normalizeRuntimeReplaceable(plan)
+    withNormalizedRuntimeReplaceable transformAllExpressions {
+      case c: CommonExpressionDef =>
+        c.copy(id = new CommonExpressionId(id = 0))
+      case c: CommonExpressionRef =>
+        c.copy(id = new CommonExpressionId(id = 0))
+    }
+  }
+
+  /**
+   * Normalize [[RuntimeReplaceable]] nodes by replacing them with their 
replacement expressions.
    * This is necessary because fixed-point analyzer may produce 
non-deterministic results when
    * resolving original expressions. For example, in a query like:
    *
@@ -44,15 +61,10 @@ object NormalizePlan extends PredicateHelper {
    * child of initially unresolved function is resolved, the function can be 
converted to
    * [[AssertTrue]], which is of type [[InheritAnalysisRules]]. However, 
because the only child of
    * [[InheritAnalysisRules]] is the replacement expression, the original 
expression will be lost
-   * timezone will never be applied. This causes inconsistencies, because 
fixed-point semantic is
-   * to ALWAYS apply timezone, regardless of whether or not the Cast actually 
needs it.
+   * and timezone will never be applied. This causes inconsistencies, because 
fixed-point semantic
+   * is to ALWAYS apply timezone, regardless of whether the Cast actually 
needs it.
    */
-  def normalizeInheritAnalysisRules(plan: LogicalPlan): LogicalPlan = {
-    plan transformAllExpressions {
-      case inheritAnalysisRules: InheritAnalysisRules =>
-        inheritAnalysisRules.child
-    }
-  }
+  def normalizeRuntimeReplaceable(plan: LogicalPlan): LogicalPlan = 
ReplaceExpressions(plan)
 
   /**
    * Since attribute references are given globally unique ids during analysis,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
index 5ff66098107c..aa2a6408faf0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
@@ -20,7 +20,16 @@ package org.apache.spark.sql.catalyst.plans
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{AssertTrue, Cast, If, 
Literal, TimeZoneAwareExpression}
+import org.apache.spark.sql.catalyst.expressions.{
+  AssertTrue,
+  Cast,
+  CommonExpressionDef,
+  CommonExpressionId,
+  CommonExpressionRef,
+  If,
+  Literal,
+  TimeZoneAwareExpression
+}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.types.BooleanType
 
@@ -70,6 +79,27 @@ class NormalizePlanSuite extends SparkFunSuite with 
SQLConfHelper {
     assert(NormalizePlan(resolvedBaselinePlan) == 
NormalizePlan(resolvedTestPlan))
   }
 
+  test("Normalize CommonExpressionId") {
+    val baselineCommonExpressionRef =
+      CommonExpressionRef(id = new CommonExpressionId, dataType = BooleanType, 
nullable = false)
+    val baselineCommonExpressionDef = CommonExpressionDef(child = Literal(0))
+    val testCommonExpressionRef =
+      CommonExpressionRef(id = new CommonExpressionId, dataType = BooleanType, 
nullable = false)
+    val testCommonExpressionDef = CommonExpressionDef(child = Literal(0))
+
+    val baselinePlanRef = LocalRelation().select(baselineCommonExpressionRef)
+    val testPlanRef = LocalRelation().select(testCommonExpressionRef)
+
+    assert(baselinePlanRef != testPlanRef)
+    assert(NormalizePlan(baselinePlanRef) == NormalizePlan(testPlanRef))
+
+    val baselinePlanDef = LocalRelation().select(baselineCommonExpressionDef)
+    val testPlanDef = LocalRelation().select(testCommonExpressionDef)
+
+    assert(baselinePlanDef != testPlanDef)
+    assert(NormalizePlan(baselinePlanDef) == NormalizePlan(testPlanDef))
+  }
+
   private def setTimezoneForAllExpression(plan: LogicalPlan): LogicalPlan = {
     plan.transformAllExpressions {
       case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to