This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8915c6078527 [SPARK-52705][SQL] Refactor deterministic check for 
grouping expressions
8915c6078527 is described below

commit 8915c6078527c14d1d6086ed94e21bac74ba64dc
Author: Mihailo Timotic <mihailo.timo...@databricks.com>
AuthorDate: Tue Jul 8 18:32:42 2025 +0800

    [SPARK-52705][SQL] Refactor deterministic check for grouping expressions
    
    ### What changes were proposed in this pull request?
    
    Move check for non-deterministic expressions in grouping expressions from 
`ExprUtils` to `CheckAnalysis`.
    ### Why are the changes needed?
    This is necessary in order to be able to utilize `PullOutNonDeterminstic` 
rule as a post-processing rewrite rule in single-pass analyzer. Because 
`ExprUtils.assertValidAggregate` is called during the bottom-up traversal, we 
can't check for non-determinstic expressions there
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51391 from mihailotim-db/mihailotim-db/pull_out_nondeterministic.
    
    Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 14 +++++++++++++-
 .../sql/catalyst/analysis/PullOutNondeterministic.scala    | 14 +++++++++++++-
 .../apache/spark/sql/catalyst/expressions/ExprUtils.scala  | 12 ------------
 3 files changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 863398de9cc9..5a58c24bc190 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -569,7 +569,19 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
                 messageParameters = Map.empty)
             }
 
-          case a: Aggregate => ExprUtils.assertValidAggregation(a)
+          case a: Aggregate =>
+            a.groupingExpressions.foreach(
+              expression =>
+                if (!expression.deterministic) {
+                  throw SparkException.internalError(
+                    msg = s"Non-deterministic expression 
'${toSQLExpr(expression)}' should not " +
+                      "appear in grouping expression.",
+                    context = expression.origin.getQueryContext,
+                    summary = expression.origin.context.summary
+                  )
+                }
+            )
+            ExprUtils.assertValidAggregation(a)
 
           case CollectMetrics(name, metrics, _, _) =>
             if (name == null || name.isEmpty) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
index e0b984540cac..6769babdd1f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.ExprUtils.toSQLExpr
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
       val nondeterToAttr =
         
NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions)
       val newChild = Project(a.child.output ++ 
nondeterToAttr.values.asScala.toSeq, a.child)
-      a.transformExpressions { case e =>
+      val deterministicAggregate = a.transformExpressions { case e =>
         Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
       }.copy(child = newChild)
 
+      deterministicAggregate.groupingExpressions.foreach(expr => if 
(!expr.deterministic) {
+        throw SparkException.internalError(
+          msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not 
appear in " +
+            "grouping expression.",
+          context = expr.origin.getQueryContext,
+          summary = expr.origin.context.summary)
+      })
+
+      deterministicAggregate
+
     // Don't touch collect metrics. Top-level metrics are not supported (check 
analysis will fail)
     // and we want to retain them inside the aggregate functions.
     case m: CollectMetrics => m
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 8b7d641828ba..783de160f83b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
 import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
 import java.util.Locale
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
@@ -209,17 +208,6 @@ object ExprUtils extends EvalHelper with QueryErrorsBase {
             "sqlExpr" -> toSQLExpr(expr),
             "dataType" -> toSQLType(expr.dataType)))
       }
-
-      if (!expr.deterministic) {
-        // This is just a sanity check, our analysis rule 
PullOutNondeterministic should
-        // already pull out those nondeterministic expressions and evaluate 
them in
-        // a Project node.
-        throw SparkException.internalError(
-          msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not 
appear in " +
-            "grouping expression.",
-          context = expr.origin.getQueryContext,
-          summary = expr.origin.context.summary)
-      }
     }
 
     a.groupingExpressions.foreach(checkValidGroupingExprs)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to