This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8915c6078527 [SPARK-52705][SQL] Refactor deterministic check for grouping expressions 8915c6078527 is described below commit 8915c6078527c14d1d6086ed94e21bac74ba64dc Author: Mihailo Timotic <mihailo.timo...@databricks.com> AuthorDate: Tue Jul 8 18:32:42 2025 +0800 [SPARK-52705][SQL] Refactor deterministic check for grouping expressions ### What changes were proposed in this pull request? Move check for non-deterministic expressions in grouping expressions from `ExprUtils` to `CheckAnalysis`. ### Why are the changes needed? This is necessary in order to be able to utilize `PullOutNonDeterminstic` rule as a post-processing rewrite rule in single-pass analyzer. Because `ExprUtils.assertValidAggregate` is called during the bottom-up traversal, we can't check for non-determinstic expressions there ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51391 from mihailotim-db/mihailotim-db/pull_out_nondeterministic. Authored-by: Mihailo Timotic <mihailo.timo...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 14 +++++++++++++- .../sql/catalyst/analysis/PullOutNondeterministic.scala | 14 +++++++++++++- .../apache/spark/sql/catalyst/expressions/ExprUtils.scala | 12 ------------ 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 863398de9cc9..5a58c24bc190 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -569,7 +569,19 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString messageParameters = Map.empty) } - case a: Aggregate => ExprUtils.assertValidAggregation(a) + case a: Aggregate => + a.groupingExpressions.foreach( + expression => + if (!expression.deterministic) { + throw SparkException.internalError( + msg = s"Non-deterministic expression '${toSQLExpr(expression)}' should not " + + "appear in grouping expression.", + context = expression.origin.getQueryContext, + summary = expression.origin.context.summary + ) + } + ) + ExprUtils.assertValidAggregation(a) case CollectMetrics(name, metrics, _, _) => if (name == null || name.isEmpty) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala index e0b984540cac..6769babdd1f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.ExprUtils.toSQLExpr import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] { val nondeterToAttr = NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions) val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child) - a.transformExpressions { case e => + val deterministicAggregate = a.transformExpressions { case e => Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e) }.copy(child = newChild) + deterministicAggregate.groupingExpressions.foreach(expr => if (!expr.deterministic) { + throw SparkException.internalError( + msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " + + "grouping expression.", + context = expr.origin.getQueryContext, + summary = expr.origin.context.summary) + }) + + deterministicAggregate + // Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail) // and we want to retain them inside the aggregate functions. case m: CollectMetrics => m diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala index 8b7d641828ba..783de160f83b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition} import java.util.Locale -import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} @@ -209,17 +208,6 @@ object ExprUtils extends EvalHelper with QueryErrorsBase { "sqlExpr" -> toSQLExpr(expr), "dataType" -> toSQLType(expr.dataType))) } - - if (!expr.deterministic) { - // This is just a sanity check, our analysis rule PullOutNondeterministic should - // already pull out those nondeterministic expressions and evaluate them in - // a Project node. - throw SparkException.internalError( - msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " + - "grouping expression.", - context = expr.origin.getQueryContext, - summary = expr.origin.context.summary) - } } a.groupingExpressions.foreach(checkValidGroupingExprs) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org