This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1112a870e0d7 [SPARK-52783][SQL] Refactor windowFunction validation 
logic from checkAnalysis for reuse in single-pass analyzer
1112a870e0d7 is described below

commit 1112a870e0d7439953eb48ca689f5b4b68d29de5
Author: Nikola Jovićević <nikola.jovice...@databricks.com>
AuthorDate: Wed Jul 16 14:45:07 2025 +0800

    [SPARK-52783][SQL] Refactor windowFunction validation logic from 
checkAnalysis for reuse in single-pass analyzer
    
    ### What changes were proposed in this pull request?
    
    Refactor existing logic for `windowFunction` validation by extracting its 
implementation from `checkAnalysis` into 
`WindowResolution.validateResolvedWindowExpression` method.
    
    ### Why are the changes needed?
    
    The same repeating logic is required both in the fixed-point and 
single-pass analyzer implementations, thus extracting it into an object ensures 
reusable and modular structure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51496 from nikola-jovicevic-db/SPARK-52783.
    
    Authored-by: Nikola Jovićević <nikola.jovice...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 33 +------------
 .../sql/catalyst/analysis/WindowResolution.scala   | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 31 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 5a58c24bc190..25ae710eeebb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.ResolveWithCTE.checkIfSelfReferenceIsPlacedCorrectly
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, ListAgg}
 import org.apache.spark.sql.catalyst.optimizer.InlineCTE
 import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE,
 PLAN_EXPRESSION, UNRESOLVED_WINDOW_EXPRESSION}
@@ -437,11 +437,6 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
               errorClass = "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
               messageParameters = Map("funcName" -> toSQLExpr(w)))
 
-          case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) 
=>
-            w.failAnalysis(
-              errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
-              messageParameters = Map("windowExpr" -> toSQLExpr(w)))
-
           case w @ WindowExpression(wf: FrameLessOffsetWindowFunction,
             WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
              if order.isEmpty || !frame.isOffset =>
@@ -457,31 +452,7 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
               listAgg.prettyName, listAgg.child, listAgg.orderExpressions)
 
           case w: WindowExpression =>
-            // Only allow window functions with an aggregate expression or an 
offset window
-            // function or a Pandas window UDF.
-            w.windowFunction match {
-              case agg @ AggregateExpression(fun: ListAgg, _, _, _, _)
-                // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY 
...) is unsupported
-                if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty ||
-                  w.windowSpec.frameSpecification !=
-                  SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing)) =>
-                agg.failAnalysis(
-                  errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
-                  messageParameters = Map("aggFunc" -> 
toSQLExpr(agg.aggregateFunction)))
-              case agg @ AggregateExpression(
-                _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _)
-                if w.windowSpec.orderSpec.nonEmpty || 
w.windowSpec.frameSpecification !=
-                    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing) =>
-                agg.failAnalysis(
-                  errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
-                  messageParameters = Map("aggFunc" -> 
toSQLExpr(agg.aggregateFunction)))
-              case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
-                  _: AggregateWindowFunction => // OK
-              case other =>
-                other.failAnalysis(
-                  errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW",
-                  messageParameters = Map("sqlExpr" -> toSQLExpr(other)))
-            }
+            WindowResolution.validateResolvedWindowExpression(w)
 
           case s: SubqueryExpression =>
             checkSubqueryExpression(operator, s)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
index 9a48c24d709c..5f69865b3532 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.expressions.{
+  AggregateWindowFunction,
   CurrentRow,
   Expression,
   FrameLessOffsetWindowFunction,
@@ -32,6 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.{
   WindowFunction,
   WindowSpecDefinition
 }
+import org.apache.spark.sql.catalyst.expressions.aggregate.{
+  AggregateExpression,
+  ListAgg,
+  Median,
+  PercentileCont,
+  PercentileDisc
+}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 /**
@@ -92,4 +101,50 @@ object WindowResolution {
 
     case e => e
   }
+
+  /**
+   * Validates a resolved [[WindowExpression]] to ensure it conforms to the 
allowed constraints.
+   *
+   * By checking the type and configuration of 
[[WindowExpression.windowFunction]] it enforces the
+   * following rules:
+   * - Disallows distinct aggregate expressions in window functions.
+   * - Disallows use of certain aggregate functions - [[ListaAgg]], 
[[PercentileCont]],
+   *   [[PercentileDisc]], [[Median]]
+   * - Allows only window functions of following types:
+   *   - [[AggregateExpression]] (non-distinct)
+   *   - [[FrameLessOffsetWindowFunction]]
+   *   - [[AggregateWindowFunction]]
+   */
+  def validateResolvedWindowExpression(windowExpression: WindowExpression): 
Unit = {
+    windowExpression.windowFunction match {
+      case AggregateExpression(_, _, true, _, _) =>
+        windowExpression.failAnalysis(
+          errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
+          messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression))
+        )
+      case agg @ AggregateExpression(fun: ListAgg, _, _, _, _)
+        // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is 
unsupported
+        if fun.orderingFilled && 
(windowExpression.windowSpec.orderSpec.nonEmpty ||
+          windowExpression.windowSpec.frameSpecification !=
+            SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing)) =>
+        agg.failAnalysis(
+          errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+          messageParameters = Map("aggFunc" -> 
toSQLExpr(agg.aggregateFunction))
+        )
+      case agg @ AggregateExpression(_: PercentileCont | _: PercentileDisc | 
_: Median, _, _, _, _)
+        if windowExpression.windowSpec.orderSpec.nonEmpty ||
+          windowExpression.windowSpec.frameSpecification !=
+            SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing) =>
+        agg.failAnalysis(
+          errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+          messageParameters = Map("aggFunc" -> 
toSQLExpr(agg.aggregateFunction))
+        )
+      case _: AggregateExpression | _: FrameLessOffsetWindowFunction | _: 
AggregateWindowFunction =>
+      case other =>
+        other.failAnalysis(
+          errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW",
+          messageParameters = Map("sqlExpr" -> toSQLExpr(other))
+        )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to