This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9677b44e7de8 [SPARK-52727][SQL] Refactor Window resolution in order to reuse it in single-pass analyzer 9677b44e7de8 is described below commit 9677b44e7de81e642c1b6fe0655503e9556fc3be Author: Nikola Jovićević <nikola.jovice...@your.hostname.com> AuthorDate: Fri Jul 11 20:30:55 2025 +0800 [SPARK-52727][SQL] Refactor Window resolution in order to reuse it in single-pass analyzer ### What changes were proposed in this pull request? Refactor existing logic for `ResolveWindowFrame` and `ResolveWindowOrder` rules by extracting their implementations into separate `WindowResolution` object. ### Why are the changes needed? The same repeating logic is required both in the fixed-point and single-pass analyzer implementations, thus extracting it into an object ensures reusable and modular structure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51433 from nikola-jovicevic-db/SPARK-52727. Authored-by: Nikola Jovićević <nikola.jovice...@your.hostname.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 24 +----- .../sql/catalyst/analysis/WindowResolution.scala | 95 ++++++++++++++++++++++ 2 files changed, 97 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 78ae9c8afe46..964a9d2ef0b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3632,23 +3632,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor object ResolveWindowFrame extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning( _.containsPattern(WINDOW_EXPRESSION), ruleId) { - case WindowExpression(wf: FrameLessOffsetWindowFunction, - WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => - throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName) - case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) - if wf.frame != UnspecifiedFrame && wf.frame != f => - throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame) - case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, UnspecifiedFrame)) - if wf.frame != UnspecifiedFrame => - WindowExpression(wf, s.copy(frameSpecification = wf.frame)) - case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) - if e.resolved => - val frame = if (o.nonEmpty) { - SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow) - } else { - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) - } - we.copy(windowSpec = s.copy(frameSpecification = frame)) + case we => WindowResolution.resolveFrame(we) } } @@ -3658,11 +3642,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor object ResolveWindowOrder extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning( _.containsPattern(WINDOW_EXPRESSION), ruleId) { - case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => - throw QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf) - case WindowExpression(rank: RankLike, spec) if spec.resolved => - val order = spec.orderSpec.map(_.child) - WindowExpression(rank.withOrder(order), spec) + case we => WindowResolution.resolveOrder(we) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala new file mode 100644 index 000000000000..9a48c24d709c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{ + CurrentRow, + Expression, + FrameLessOffsetWindowFunction, + RangeFrame, + RankLike, + RowFrame, + SpecifiedWindowFrame, + UnboundedFollowing, + UnboundedPreceding, + UnspecifiedFrame, + WindowExpression, + WindowFunction, + WindowSpecDefinition +} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Utility object for resolving [[WindowExpression]]. + * + * It ensures that window frame defintions and order specs are consistent between the + * [[WindowFunction]] and [[WindowSpecDefinition]], throwing errors if configurations are + * incompatible or missing. + */ +object WindowResolution { + + /** + * Validates the window frame of a [[WindowExpression]]. + * + * It enforces that the frame in [[WindowExpression.windowFunction]] matches the frame + * in [[WindowExpression.windowSpec]], alterantively it provides a default frame when it + * is unspecified. + */ + def resolveFrame(expression: Expression): Expression = expression match { + case WindowExpression( + wf: FrameLessOffsetWindowFunction, + WindowSpecDefinition(_, _, f: SpecifiedWindowFrame) + ) if wf.frame != f => + throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName) + + case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) + if wf.frame != UnspecifiedFrame && wf.frame != f => + throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame) + + case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, UnspecifiedFrame)) + if wf.frame != UnspecifiedFrame => + WindowExpression(wf, s.copy(frameSpecification = wf.frame)) + + case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) if e.resolved => + val frame = if (o.nonEmpty) { + SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow) + } else { + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) + } + we.copy(windowSpec = s.copy(frameSpecification = frame)) + + case e => e + } + + /** + * Ensures that [[WindowExpression.windowSpec.orderSpec]] is not missing. + * + * In case of [[RankLike]] window functions, it attaches the resolved order to the + * function to finalize it. + */ + def resolveOrder(expression: Expression): Expression = expression match { + case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => + throw QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf) + + case WindowExpression(rank: RankLike, spec) if spec.resolved => + val order = spec.orderSpec.map(_.child) + WindowExpression(rank.withOrder(order), spec) + + case e => e + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org