This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9677b44e7de8 [SPARK-52727][SQL] Refactor Window resolution in order to 
reuse it in single-pass analyzer
9677b44e7de8 is described below

commit 9677b44e7de81e642c1b6fe0655503e9556fc3be
Author: Nikola Jovićević <nikola.jovice...@your.hostname.com>
AuthorDate: Fri Jul 11 20:30:55 2025 +0800

    [SPARK-52727][SQL] Refactor Window resolution in order to reuse it in 
single-pass analyzer
    
    ### What changes were proposed in this pull request?
    
    Refactor existing logic for `ResolveWindowFrame` and `ResolveWindowOrder` 
rules by extracting their implementations into separate `WindowResolution` 
object.
    
    ### Why are the changes needed?
    
    The same repeating logic is required both in the fixed-point and 
single-pass analyzer implementations, thus extracting it into an object ensures 
reusable and modular structure.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51433 from nikola-jovicevic-db/SPARK-52727.
    
    Authored-by: Nikola Jovićević <nikola.jovice...@your.hostname.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 24 +-----
 .../sql/catalyst/analysis/WindowResolution.scala   | 95 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 78ae9c8afe46..964a9d2ef0b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3632,23 +3632,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   object ResolveWindowFrame extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressionsWithPruning(
       _.containsPattern(WINDOW_EXPRESSION), ruleId) {
-      case WindowExpression(wf: FrameLessOffsetWindowFunction,
-        WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f 
=>
-        throw 
QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
-      case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: 
SpecifiedWindowFrame))
-          if wf.frame != UnspecifiedFrame && wf.frame != f =>
-        throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, 
wf.frame)
-      case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, 
UnspecifiedFrame))
-          if wf.frame != UnspecifiedFrame =>
-        WindowExpression(wf, s.copy(frameSpecification = wf.frame))
-      case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, 
UnspecifiedFrame))
-          if e.resolved =>
-        val frame = if (o.nonEmpty) {
-          SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
-        } else {
-          SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
-        }
-        we.copy(windowSpec = s.copy(frameSpecification = frame))
+      case we => WindowResolution.resolveFrame(we)
     }
   }
 
@@ -3658,11 +3642,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   object ResolveWindowOrder extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressionsWithPruning(
       _.containsPattern(WINDOW_EXPRESSION), ruleId) {
-      case WindowExpression(wf: WindowFunction, spec) if 
spec.orderSpec.isEmpty =>
-        throw 
QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)
-      case WindowExpression(rank: RankLike, spec) if spec.resolved =>
-        val order = spec.orderSpec.map(_.child)
-        WindowExpression(rank.withOrder(order), spec)
+      case we => WindowResolution.resolveOrder(we)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
new file mode 100644
index 000000000000..9a48c24d709c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{
+  CurrentRow,
+  Expression,
+  FrameLessOffsetWindowFunction,
+  RangeFrame,
+  RankLike,
+  RowFrame,
+  SpecifiedWindowFrame,
+  UnboundedFollowing,
+  UnboundedPreceding,
+  UnspecifiedFrame,
+  WindowExpression,
+  WindowFunction,
+  WindowSpecDefinition
+}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Utility object for resolving [[WindowExpression]].
+ *
+ * It ensures that window frame defintions and order specs are consistent 
between the
+ * [[WindowFunction]] and [[WindowSpecDefinition]], throwing errors if 
configurations are
+ * incompatible or missing.
+ */
+object WindowResolution {
+
+  /**
+   * Validates the window frame of a [[WindowExpression]].
+   *
+   * It enforces that the frame in [[WindowExpression.windowFunction]] matches 
the frame
+   * in [[WindowExpression.windowSpec]], alterantively it provides a default 
frame when it
+   * is unspecified.
+   */
+  def resolveFrame(expression: Expression): Expression = expression match {
+    case WindowExpression(
+        wf: FrameLessOffsetWindowFunction,
+        WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)
+        ) if wf.frame != f =>
+      throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
+
+    case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: 
SpecifiedWindowFrame))
+        if wf.frame != UnspecifiedFrame && wf.frame != f =>
+      throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, 
wf.frame)
+
+    case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, 
UnspecifiedFrame))
+        if wf.frame != UnspecifiedFrame =>
+      WindowExpression(wf, s.copy(frameSpecification = wf.frame))
+
+    case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, 
UnspecifiedFrame)) if e.resolved =>
+      val frame = if (o.nonEmpty) {
+        SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
+      } else {
+        SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)
+      }
+      we.copy(windowSpec = s.copy(frameSpecification = frame))
+
+    case e => e
+  }
+
+  /**
+   * Ensures that [[WindowExpression.windowSpec.orderSpec]] is not missing.
+   *
+   * In case of [[RankLike]] window functions, it attaches the resolved order 
to the
+   * function to finalize it.
+   */
+  def resolveOrder(expression: Expression): Expression = expression match {
+    case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty 
=>
+      throw 
QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)
+
+    case WindowExpression(rank: RankLike, spec) if spec.resolved =>
+      val order = spec.orderSpec.map(_.child)
+      WindowExpression(rank.withOrder(order), spec)
+
+    case e => e
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to