This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fdc3754d676 [SPARK-28386][SQL] Cannot resolve ORDER BY columns with 
GROUP BY and HAVING
9fdc3754d676 is described below

commit 9fdc3754d676bcb5de500c87025b9c571a7cf523
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 20 16:46:54 2023 +0800

    [SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING
    
    ### What changes were proposed in this pull request?
    
    This PR enhanced the analyzer to handle the following pattern properly.
    
    ```
    Sort
     - Filter
       - Aggregate
    ```
    
    ### Why are the changes needed?
    
    ```
    spark-sql (default)> CREATE TABLE t1 (flag BOOLEAN, dt STRING);
    
    spark-sql (default)>   SELECT LENGTH(dt),
                       >          COUNT(t1.flag)
                       >     FROM t1
                       > GROUP BY LENGTH(dt)
                       >   HAVING COUNT(t1.flag) > 1
                       > ORDER BY LENGTH(dt);
    [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with 
name `dt` cannot be resolved. Did you mean one of the following? [`length(dt)`, 
`count(flag)`].; line 6 pos 16;
    'Sort ['LENGTH('dt) ASC NULLS FIRST], true
    +- Filter (count(flag)#60L > cast(1 as bigint))
       +- Aggregate [length(dt#9)], [length(dt#9) AS length(dt)#59, 
count(flag#8) AS count(flag)#60L]
          +- SubqueryAlias spark_catalog.default.t1
             +- Relation spark_catalog.default.t1[flag#8,dt#9] parquet
    ```
    
    The above code demonstrates the failure case, the query failed during the 
analysis phase when both `HAVING` and `ORDER BY` clauses are present, but 
successful if only one is present.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, maybe we can call it a bugfix.
    
    ### How was this patch tested?
    
    New UTs are added
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44352 from pan3793/SPARK-28386.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  9 +++++++
 .../analysis/ResolveReferencesInSort.scala         | 16 +++++++-----
 .../sql-tests/analyzer-results/having.sql.out      | 29 ++++++++++++++++++++++
 .../udf/postgreSQL/udf-select_having.sql.out       | 11 ++++----
 .../src/test/resources/sql-tests/inputs/having.sql |  6 +++++
 .../resources/sql-tests/results/having.sql.out     | 18 ++++++++++++++
 .../approved-plans-v2_7/q6.sf100/explain.txt       |  8 +++---
 .../approved-plans-v2_7/q6.sf100/simplified.txt    |  2 +-
 .../approved-plans-v2_7/q6/explain.txt             |  8 +++---
 .../approved-plans-v2_7/q6/simplified.txt          |  2 +-
 10 files changed, 87 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e3538647e375..94f6d3346265 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           }
           s.copy(order = newSortOrder, child = newChild)
         })
+
+      case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate))
+          if agg.resolved && cond.resolved && s.order.forall(_.resolved) =>
+        resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs, 
newChild) => {
+          val newSortOrder = s.order.zip(newExprs).map {
+            case (sortOrder, expr) => sortOrder.copy(child = expr)
+          }
+          s.copy(order = newSortOrder, child = f.copy(child = newChild))
+        })
     }
 
     /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
index 02583ebb8f6b..6fa723d4a75f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, Project, Sort}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 
 /**
@@ -28,10 +28,11 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
  *    includes metadata columns as well.
  * 2. Resolves the column to a literal function which is allowed to be invoked 
without braces, e.g.
  *    `SELECT col, current_date FROM t`.
- * 3. If the child plan is Aggregate, resolves the column to 
[[TempResolvedColumn]] with the output
- *    of Aggregate's child plan. This is to allow Sort to host grouping 
expressions and aggregate
- *    functions, which can be pushed down to the Aggregate later. For example,
- *    `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`.
+ * 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the 
column to
+ *    [[TempResolvedColumn]] with the output of Aggregate's child plan.
+ *    This is to allow Sort to host grouping expressions and aggregate 
functions, which can
+ *    be pushed down to the Aggregate later. For example,
+ *    `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`.
  * 4. Resolves the column to [[AttributeReference]] with the output of a 
descendant plan node.
  *    Spark will propagate the missing attributes from the descendant plan 
node to the Sort node.
  *    This is to allow users to ORDER BY columns that are not in the SELECT 
clause, which is
@@ -51,7 +52,10 @@ class ResolveReferencesInSort(val catalogManager: 
CatalogManager)
 
   def apply(s: Sort): LogicalPlan = {
     val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
-    val resolvedWithAgg = resolvedBasic.map(resolveColWithAgg(_, s.child))
+    val resolvedWithAgg = s.child match {
+      case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, 
agg))
+      case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
+    }
     val (missingAttrResolved, newChild) = 
resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
     val orderByAllResolved = resolveOrderByAll(
       s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
index 12eb5a34146a..d96ea1e43e18 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
@@ -179,3 +179,32 @@ Filter (c1#x = 1)
 +- Aggregate [c1#x], [c1#x]
    +- SubqueryAlias t
       +- LocalRelation [c1#x, c2#x]
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
+-- !query analysis
+Sort [sum(v)#xL ASC NULLS FIRST], true
++- Filter (sum(v)#xL > cast(2 as bigint))
+   +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL]
+      +- SubqueryAlias hav
+         +- View (`hav`, [k#x,v#x])
+            +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+               +- Project [k#x, v#x]
+                  +- SubqueryAlias hav
+                     +- LocalRelation [k#x, v#x]
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
+-- !query analysis
+Project [k#x, sum(v)#xL]
++- Sort [avg(v#x)#x ASC NULLS FIRST], true
+   +- Filter (sum(v)#xL > cast(2 as bigint))
+      +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL, avg(v#x) AS avg(v#x)#x]
+         +- SubqueryAlias hav
+            +- View (`hav`, [k#x,v#x])
+               +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+                  +- Project [k#x, v#x]
+                     +- SubqueryAlias hav
+                        +- LocalRelation [k#x, v#x]
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
index 5f7e92a62f30..ea6b1716869f 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
@@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x]
 SELECT udf(b), udf(c) FROM test_having
        GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c)
 -- !query analysis
-Project [udf(b)#x, udf(c)#x]
-+- Sort [cast(udf(cast(b#x as string)) as int) ASC NULLS FIRST, 
cast(udf(cast(c#x as string)) as string) ASC NULLS FIRST], true
-   +- Filter (udf(b)#x = 3)
-      +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS 
udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x, b#x, c#x]
-         +- SubqueryAlias spark_catalog.default.test_having
-            +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] 
parquet
+Sort [udf(b)#x ASC NULLS FIRST, udf(c)#x ASC NULLS FIRST], true
++- Filter (udf(b)#x = 3)
+   +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS 
udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x]
+      +- SubqueryAlias spark_catalog.default.test_having
+         +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/having.sql 
b/sql/core/src/test/resources/sql-tests/inputs/having.sql
index 056b99e363d2..4c25a60c8abb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/having.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/having.sql
@@ -33,3 +33,9 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING 
SETS(t.c1) HAVING t.
 SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1;
 SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 
1;
 SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1;
+
+-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the 
agg function presents on SELECT list
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v);
+
+-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the 
agg function does not present on SELECT list
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v);
diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out 
b/sql/core/src/test/resources/sql-tests/results/having.sql.out
index 6eaba0b4119c..c9d588642636 100644
--- a/sql/core/src/test/resources/sql-tests/results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/having.sql.out
@@ -134,3 +134,21 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 
HAVING t.c1 = 1
 struct<c1:int>
 -- !query output
 1
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
+-- !query schema
+struct<k:string,sum(v):bigint>
+-- !query output
+three  3
+one    6
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
+-- !query schema
+struct<k:string,sum(v):bigint>
+-- !query output
+one    6
+three  3
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
index afdfc51a17dd..82a6e00c79c4 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
@@ -248,15 +248,15 @@ Input [2]: [ca_state#18, count#22]
 Keys [1]: [ca_state#18]
 Functions [1]: [count(1)]
 Aggregate Attributes [1]: [count(1)#23]
-Results [3]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25, ca_state#18]
+Results [2]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25]
 
 (44) Filter [codegen id : 14]
-Input [3]: [state#24, cnt#25, ca_state#18]
+Input [2]: [state#24, cnt#25]
 Condition : (cnt#25 >= 10)
 
 (45) TakeOrderedAndProject
-Input [3]: [state#24, cnt#25, ca_state#18]
-Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#18 ASC NULLS FIRST], 
[state#24, cnt#25]
+Input [2]: [state#24, cnt#25]
+Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, 
cnt#25]
 
 ===== Subqueries =====
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
index 7339df16a289..d69eb47d92c8 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
@@ -1,4 +1,4 @@
-TakeOrderedAndProject [cnt,ca_state,state]
+TakeOrderedAndProject [cnt,state]
   WholeStageCodegen (14)
     Filter [cnt]
       HashAggregate [ca_state,count] [count(1),state,cnt,count]
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
index a2638dac5645..507d4991a046 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
@@ -218,15 +218,15 @@ Input [2]: [ca_state#2, count#22]
 Keys [1]: [ca_state#2]
 Functions [1]: [count(1)]
 Aggregate Attributes [1]: [count(1)#23]
-Results [3]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25, ca_state#2]
+Results [2]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25]
 
 (38) Filter [codegen id : 8]
-Input [3]: [state#24, cnt#25, ca_state#2]
+Input [2]: [state#24, cnt#25]
 Condition : (cnt#25 >= 10)
 
 (39) TakeOrderedAndProject
-Input [3]: [state#24, cnt#25, ca_state#2]
-Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], 
[state#24, cnt#25]
+Input [2]: [state#24, cnt#25]
+Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, 
cnt#25]
 
 ===== Subqueries =====
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
index c9c8358ba0b9..a15b638fbb2c 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
@@ -1,4 +1,4 @@
-TakeOrderedAndProject [cnt,ca_state,state]
+TakeOrderedAndProject [cnt,state]
   WholeStageCodegen (8)
     Filter [cnt]
       HashAggregate [ca_state,count] [count(1),state,cnt,count]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to