This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1112a870e0d7 [SPARK-52783][SQL] Refactor windowFunction validation logic from checkAnalysis for reuse in single-pass analyzer 1112a870e0d7 is described below commit 1112a870e0d7439953eb48ca689f5b4b68d29de5 Author: Nikola Jovićević <nikola.jovice...@databricks.com> AuthorDate: Wed Jul 16 14:45:07 2025 +0800 [SPARK-52783][SQL] Refactor windowFunction validation logic from checkAnalysis for reuse in single-pass analyzer ### What changes were proposed in this pull request? Refactor existing logic for `windowFunction` validation by extracting its implementation from `checkAnalysis` into `WindowResolution.validateResolvedWindowExpression` method. ### Why are the changes needed? The same repeating logic is required both in the fixed-point and single-pass analyzer implementations, thus extracting it into an object ensures reusable and modular structure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51496 from nikola-jovicevic-db/SPARK-52783. Authored-by: Nikola Jovićević <nikola.jovice...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 33 +------------ .../sql/catalyst/analysis/WindowResolution.scala | 55 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5a58c24bc190..25ae710eeebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.analysis.ResolveWithCTE.checkIfSelfReferenceIsPlacedCorrectly import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, ListAgg} import org.apache.spark.sql.catalyst.optimizer.InlineCTE import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, PLAN_EXPRESSION, UNRESOLVED_WINDOW_EXPRESSION} @@ -437,11 +437,6 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString errorClass = "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE", messageParameters = Map("funcName" -> toSQLExpr(w))) - case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) => - w.failAnalysis( - errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", - messageParameters = Map("windowExpr" -> toSQLExpr(w))) - case w @ WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame)) if order.isEmpty || !frame.isOffset => @@ -457,31 +452,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString listAgg.prettyName, listAgg.child, listAgg.orderExpressions) case w: WindowExpression => - // Only allow window functions with an aggregate expression or an offset window - // function or a Pandas window UDF. - w.windowFunction match { - case agg @ AggregateExpression(fun: ListAgg, _, _, _, _) - // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported - if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty || - w.windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => - agg.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction))) - case agg @ AggregateExpression( - _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) - if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => - agg.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction))) - case _: AggregateExpression | _: FrameLessOffsetWindowFunction | - _: AggregateWindowFunction => // OK - case other => - other.failAnalysis( - errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", - messageParameters = Map("sqlExpr" -> toSQLExpr(other))) - } + WindowResolution.validateResolvedWindowExpression(w) case s: SubqueryExpression => checkSubqueryExpression(operator, s) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala index 9a48c24d709c..5f69865b3532 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.{ + AggregateWindowFunction, CurrentRow, Expression, FrameLessOffsetWindowFunction, @@ -32,6 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.{ WindowFunction, WindowSpecDefinition } +import org.apache.spark.sql.catalyst.expressions.aggregate.{ + AggregateExpression, + ListAgg, + Median, + PercentileCont, + PercentileDisc +} +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.errors.QueryCompilationErrors /** @@ -92,4 +101,50 @@ object WindowResolution { case e => e } + + /** + * Validates a resolved [[WindowExpression]] to ensure it conforms to the allowed constraints. + * + * By checking the type and configuration of [[WindowExpression.windowFunction]] it enforces the + * following rules: + * - Disallows distinct aggregate expressions in window functions. + * - Disallows use of certain aggregate functions - [[ListaAgg]], [[PercentileCont]], + * [[PercentileDisc]], [[Median]] + * - Allows only window functions of following types: + * - [[AggregateExpression]] (non-distinct) + * - [[FrameLessOffsetWindowFunction]] + * - [[AggregateWindowFunction]] + */ + def validateResolvedWindowExpression(windowExpression: WindowExpression): Unit = { + windowExpression.windowFunction match { + case AggregateExpression(_, _, true, _, _) => + windowExpression.failAnalysis( + errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", + messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression)) + ) + case agg @ AggregateExpression(fun: ListAgg, _, _, _, _) + // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported + if fun.orderingFilled && (windowExpression.windowSpec.orderSpec.nonEmpty || + windowExpression.windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => + agg.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) + ) + case agg @ AggregateExpression(_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) + if windowExpression.windowSpec.orderSpec.nonEmpty || + windowExpression.windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => + agg.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) + ) + case _: AggregateExpression | _: FrameLessOffsetWindowFunction | _: AggregateWindowFunction => + case other => + other.failAnalysis( + errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", + messageParameters = Map("sqlExpr" -> toSQLExpr(other)) + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org