This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b97e11b53c2 [SPARK-42746][SQL] Implement LISTAGG function
4b97e11b53c2 is described below

commit 4b97e11b53c22791f186f514694d0bddab92ab64
Author: Mikhail Nikoliukin <[email protected]>
AuthorDate: Fri Nov 29 21:32:20 2024 +0800

    [SPARK-42746][SQL] Implement LISTAGG function
    
    ### What changes were proposed in this pull request?
    
    Implement new aggregation function `listagg([ALL | DISTINCT] expr[, sep]) 
[WITHIN GROUP (ORDER BY key [ASC | DESC] [,...])]`
    
    ### Why are the changes needed?
    
    Listagg is a popular function implemented by many other vendors. For now, 
users have to use workarounds like 
[this](https://kb.databricks.com/sql/recreate-listagg-functionality-with-spark-sql).
 PR will close the gap.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the new `listagg` function. BigQuery and PostgreSQL have the same 
function but with `string_agg` name so I added it as an alias.
    
    ### How was this patch tested?
    
    With new unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: GitHub Copilot
    
    Closes #48748 from mikhailnik-db/SPARK-42746-add-listagg.
    
    Lead-authored-by: Mikhail Nikoliukin <[email protected]>
    Co-authored-by: Jia Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/unsafe/types/ByteArray.java   |  36 +-
 .../apache/spark/unsafe/array/ByteArraySuite.java  |  55 +++
 .../src/main/resources/error/error-conditions.json |  51 +--
 python/pyspark/sql/tests/test_functions.py         |   8 +-
 .../scala/org/apache/spark/sql/functions.scala     |  71 ++++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   3 +
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  15 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +
 .../sql/catalyst/analysis/FunctionResolution.scala |  17 +-
 .../sql/catalyst/expressions/aggregate/Mode.scala  |   4 +-
 .../aggregate/SupportsOrderingWithinGroup.scala    |  21 +-
 .../catalyst/expressions/aggregate/collect.scala   | 298 +++++++++++++-
 .../expressions/aggregate/percentiles.scala        |  12 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  36 +-
 .../spark/sql/execution/SparkStrategies.scala      |   7 +-
 .../sql-functions/sql-expression-schema.md         |   2 +
 .../analyzer-results/listagg-collations.sql.out    |  86 ++++
 .../sql-tests/analyzer-results/listagg.sql.out     | 435 +++++++++++++++++++++
 .../sql-tests/analyzer-results/mode.sql.out        |   8 +-
 .../sql-tests/analyzer-results/percentiles.sql.out |  10 +-
 .../sql-tests/inputs/listagg-collations.sql        |  12 +
 .../test/resources/sql-tests/inputs/listagg.sql    |  38 ++
 .../sql-tests/results/listagg-collations.sql.out   |  82 ++++
 .../resources/sql-tests/results/listagg.sql.out    | 368 +++++++++++++++++
 .../test/resources/sql-tests/results/mode.sql.out  |   8 +-
 .../sql-tests/results/percentiles.sql.out          |  10 +-
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  35 ++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |   4 +-
 .../thriftserver/ThriftServerQueryTestSuite.scala  |   1 +
 29 files changed, 1655 insertions(+), 80 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
index aae47aa96320..f12408fb4931 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
@@ -135,27 +135,57 @@ public final class ByteArray {
     return Arrays.copyOfRange(bytes, start, end);
   }
 
+  /**
+   * Concatenate multiple byte arrays into one.
+   * If one of the inputs is null then null will be returned.
+   *
+   * @param inputs byte arrays to concatenate
+   * @return the concatenated byte array or null if one of the arguments is 
null
+   */
   public static byte[] concat(byte[]... inputs) {
+    return concatWS(EMPTY_BYTE, inputs);
+  }
+
+  /**
+   * Concatenate multiple byte arrays with a given delimiter.
+   * If the delimiter or one of the inputs is null then null will be returned.
+   *
+   * @param delimiter byte array to be placed between each input
+   * @param inputs    byte arrays to concatenate
+   * @return the concatenated byte array or null if one of the arguments is 
null
+   */
+  public static byte[] concatWS(byte[] delimiter, byte[]... inputs) {
+    if (delimiter == null) {
+      return null;
+    }
     // Compute the total length of the result
     long totalLength = 0;
     for (byte[] input : inputs) {
       if (input != null) {
-        totalLength += input.length;
+        totalLength += input.length + delimiter.length;
       } else {
         return null;
       }
     }
-
+    if (totalLength > 0) totalLength -= delimiter.length;
     // Allocate a new byte array, and copy the inputs one by one into it
     final byte[] result = new byte[Ints.checkedCast(totalLength)];
     int offset = 0;
-    for (byte[] input : inputs) {
+    for (int i = 0; i < inputs.length; i++) {
+      byte[] input = inputs[i];
       int len = input.length;
       Platform.copyMemory(
         input, Platform.BYTE_ARRAY_OFFSET,
         result, Platform.BYTE_ARRAY_OFFSET + offset,
         len);
       offset += len;
+      if (delimiter.length > 0 && i < inputs.length - 1) {
+        Platform.copyMemory(
+          delimiter, Platform.BYTE_ARRAY_OFFSET,
+          result, Platform.BYTE_ARRAY_OFFSET + offset,
+          delimiter.length);
+        offset += delimiter.length;
+      }
     }
     return result;
   }
diff --git 
a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java 
b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
index aff619175ff7..5e221b4e359d 100644
--- 
a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
+++ 
b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
@@ -67,4 +67,59 @@ public class ByteArraySuite {
     byte[] y4 = new byte[]{(byte) 100, (byte) 200};
     Assertions.assertEquals(0, ByteArray.compareBinary(x4, y4));
   }
+
+  @Test
+  public void testConcat() {
+    byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+    byte[] result1 = ByteArray.concat(x1, y1);
+    byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 4, 
(byte) 5, (byte) 6};
+    Assertions.assertArrayEquals(expected1, result1);
+
+    byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y2 = new byte[0];
+    byte[] result2 = ByteArray.concat(x2, y2);
+    byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    Assertions.assertArrayEquals(expected2, result2);
+
+    byte[] x3 = new byte[0];
+    byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+    byte[] result3 = ByteArray.concat(x3, y3);
+    byte[] expected3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+    Assertions.assertArrayEquals(expected3, result3);
+
+    byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y4 = null;
+    byte[] result4 = ByteArray.concat(x4, y4);
+    Assertions.assertArrayEquals(null, result4);
+  }
+
+  @Test
+  public void testConcatWS() {
+    byte[] separator = new byte[]{(byte) 42};
+
+    byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+    byte[] result1 = ByteArray.concatWS(separator, x1, y1);
+    byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42,
+            (byte) 4, (byte) 5, (byte) 6};
+    Assertions.assertArrayEquals(expected1, result1);
+
+    byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y2 = new byte[0];
+    byte[] result2 = ByteArray.concatWS(separator, x2, y2);
+    byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42};
+    Assertions.assertArrayEquals(expected2, result2);
+
+    byte[] x3 = new byte[0];
+    byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+    byte[] result3 = ByteArray.concatWS(separator, x3, y3);
+    byte[] expected3 = new byte[]{(byte) 42, (byte) 4, (byte) 5, (byte) 6};
+    Assertions.assertArrayEquals(expected3, result3);
+
+    byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+    byte[] y4 = null;
+    byte[] result4 = ByteArray.concatWS(separator, x4, y4);
+    Assertions.assertArrayEquals(null, result4);
+  }
 }
diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 77437f6c5617..024caf86cf94 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2627,29 +2627,6 @@
     ],
     "sqlState" : "22006"
   },
-  "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : {
-    "message" : [
-      "Invalid inverse distribution function <funcName>."
-    ],
-    "subClass" : {
-      "DISTINCT_UNSUPPORTED" : {
-        "message" : [
-          "Cannot use DISTINCT with WITHIN GROUP."
-        ]
-      },
-      "WITHIN_GROUP_MISSING" : {
-        "message" : [
-          "WITHIN GROUP is required for inverse distribution function."
-        ]
-      },
-      "WRONG_NUM_ORDERINGS" : {
-        "message" : [
-          "Requires <expectedNum> orderings in WITHIN GROUP but got 
<actualNum>."
-        ]
-      }
-    },
-    "sqlState" : "42K0K"
-  },
   "INVALID_JAVA_IDENTIFIER_AS_FIELD_NAME" : {
     "message" : [
       "<fieldName> is not a valid identifier of Java and cannot be used as 
field name",
@@ -3364,6 +3341,34 @@
     ],
     "sqlState" : "42601"
   },
+  "INVALID_WITHIN_GROUP_EXPRESSION" : {
+    "message" : [
+      "Invalid function <funcName> with WITHIN GROUP."
+    ],
+    "subClass" : {
+      "DISTINCT_UNSUPPORTED" : {
+        "message" : [
+          "The function does not support DISTINCT with WITHIN GROUP."
+        ]
+      },
+      "MISMATCH_WITH_DISTINCT_INPUT" : {
+        "message" : [
+          "The function is invoked with DISTINCT and WITHIN GROUP but 
expressions <funcArg> and <orderingExpr> do not match. The WITHIN GROUP 
ordering expression must be picked from the function inputs."
+        ]
+      },
+      "WITHIN_GROUP_MISSING" : {
+        "message" : [
+          "WITHIN GROUP is required for the function."
+        ]
+      },
+      "WRONG_NUM_ORDERINGS" : {
+        "message" : [
+          "The function requires <expectedNum> orderings in WITHIN GROUP but 
got <actualNum>."
+        ]
+      }
+    },
+    "sqlState" : "42K0K"
+  },
   "INVALID_WRITER_COMMIT_MESSAGE" : {
     "message" : [
       "The data source writer has generated an invalid number of commit 
messages. Expected exactly one writer commit message from each task, but 
received <detail>."
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index cf8f685ea449..6c7ce8007292 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -83,7 +83,13 @@ class FunctionsTestsMixin:
         missing_in_py = jvm_fn_set.difference(py_fn_set)
 
         # Functions that we expect to be missing in python until they are 
added to pyspark
-        expected_missing_in_py = set()
+        expected_missing_in_py = {
+            # TODO(SPARK-50220): listagg functions will soon be added and 
removed from this list
+            "listagg_distinct",
+            "listagg",
+            "string_agg",
+            "string_agg_distinct",
+        }
 
         self.assertEqual(
             expected_missing_in_py, missing_in_py, "Missing functions in 
pyspark not as expected"
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 2a04212ee258..9f509fa843a2 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1147,6 +1147,77 @@ object functions {
    */
   def sum_distinct(e: Column): Column = Column.fn("sum", isDistinct = true, e)
 
+  /**
+   * Aggregate function: returns the concatenation of non-null input values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column): Column = Column.fn("listagg", e)
+
+  /**
+   * Aggregate function: returns the concatenation of non-null input values, 
separated by the
+   * delimiter.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg(e: Column, delimiter: Column): Column = Column.fn("listagg", e, 
delimiter)
+
+  /**
+   * Aggregate function: returns the concatenation of distinct non-null input 
values.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg_distinct(e: Column): Column = Column.fn("listagg", isDistinct = 
true, e)
+
+  /**
+   * Aggregate function: returns the concatenation of distinct non-null input 
values, separated by
+   * the delimiter.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def listagg_distinct(e: Column, delimiter: Column): Column =
+    Column.fn("listagg", isDistinct = true, e, delimiter)
+
+  /**
+   * Aggregate function: returns the concatenation of non-null input values. 
Alias for `listagg`.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def string_agg(e: Column): Column = Column.fn("string_agg", e)
+
+  /**
+   * Aggregate function: returns the concatenation of non-null input values, 
separated by the
+   * delimiter. Alias for `listagg`.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def string_agg(e: Column, delimiter: Column): Column = 
Column.fn("string_agg", e, delimiter)
+
+  /**
+   * Aggregate function: returns the concatenation of distinct non-null input 
values. Alias for
+   * `listagg`.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def string_agg_distinct(e: Column): Column = Column.fn("string_agg", 
isDistinct = true, e)
+
+  /**
+   * Aggregate function: returns the concatenation of distinct non-null input 
values, separated by
+   * the delimiter. Alias for `listagg`.
+   *
+   * @group agg_funcs
+   * @since 4.0.0
+   */
+  def string_agg_distinct(e: Column, delimiter: Column): Column =
+    Column.fn("string_agg", isDistinct = true, e, delimiter)
+
   /**
    * Aggregate function: alias for `var_samp`.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8e1b9da927c9..3af3565220bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2772,6 +2772,9 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           ne
         case e: Expression if e.foldable =>
           e // No need to create an attribute reference if it will be 
evaluated as a Literal.
+        case e: SortOrder =>
+          // For SortOder just recursively extract the from child expression.
+          e.copy(child = extractExpr(e.child))
         case e: NamedArgumentExpression =>
           // For NamedArgumentExpression, we extract the value and replace it 
with
           // an AttributeReference (with an internal column name, e.g. "_w0").
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 573619af1b5f..1c76fd7d00f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, Median, PercentileCont, PercentileDisc}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc}
 import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, 
DecorrelateInnerQuery, InlineCTE}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -423,10 +423,23 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
                 "funcName" -> toSQLExpr(wf),
                 "windowExpr" -> toSQLExpr(w)))
 
+          case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _)
+            if agg.isDistinct && listAgg.needSaveOrderValue =>
+            throw 
QueryCompilationErrors.functionAndOrderExpressionMismatchError(
+              listAgg.prettyName, listAgg.child, listAgg.orderExpressions)
+
           case w: WindowExpression =>
             // Only allow window functions with an aggregate expression or an 
offset window
             // function or a Pandas window UDF.
             w.windowFunction match {
+              case agg @ AggregateExpression(fun: ListAgg, _, _, _, _)
+                // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY 
...) is unsupported
+                if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty ||
+                  w.windowSpec.frameSpecification !=
+                  SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing)) =>
+                agg.failAnalysis(
+                  errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+                  messageParameters = Map("aggFunc" -> 
toSQLExpr(agg.aggregateFunction)))
               case agg @ AggregateExpression(
                 _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _)
                 if w.windowSpec.orderSpec.nonEmpty || 
w.windowSpec.frameSpecification !=
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 5103f8048856..d9e9f49ce065 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -506,6 +506,8 @@ object FunctionRegistry {
     expression[CollectList]("collect_list"),
     expression[CollectList]("array_agg", true, Some("3.3.0")),
     expression[CollectSet]("collect_set"),
+    expression[ListAgg]("listagg"),
+    expression[ListAgg]("string_agg", setAlias = true),
     expressionBuilder("count_min_sketch", CountMinSketchAggExpressionBuilder),
     expression[BoolAnd]("every", true),
     expression[BoolAnd]("bool_and"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index 5a27a7219032..800126e0030e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -128,18 +128,15 @@ class FunctionResolution(
       numArgs: Int,
       u: UnresolvedFunction): Expression = {
     func match {
-      case owg: SupportsOrderingWithinGroup if u.isDistinct =>
-        throw 
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
-          owg.prettyName
-        )
+      case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && 
u.isDistinct =>
+        throw 
QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError(owg.prettyName)
       case owg: SupportsOrderingWithinGroup
-          if !owg.orderingFilled && u.orderingWithinGroup.isEmpty =>
-        throw 
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
-          owg.prettyName
-        )
+          if owg.isOrderingMandatory && !owg.orderingFilled && 
u.orderingWithinGroup.isEmpty =>
+        throw 
QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName)
       case owg: SupportsOrderingWithinGroup
           if owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
-        throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+        // e.g mode(expr1) within group (order by expr2) is not supported
+        throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
           owg.prettyName,
           0,
           u.orderingWithinGroup.length
@@ -198,7 +195,7 @@ class FunctionResolution(
       case agg: AggregateFunction =>
         // Note: PythonUDAF does not support these advanced clauses.
         if (agg.isInstanceOf[PythonUDAF]) checkUnsupportedAggregateClause(agg, 
u)
-        // After parse, the inverse distribution functions not set the 
ordering within group yet.
+        // After parse, the functions not set the ordering within group yet.
         val newAgg = agg match {
           case owg: SupportsOrderingWithinGroup
               if !owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index 97add0b8e45b..f3eeaa96b3d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -183,6 +183,8 @@ case class Mode(
   }
 
   override def orderingFilled: Boolean = child != UnresolvedWithinGroup
+  override def isOrderingMandatory: Boolean = true
+  override def isDistinctSupported: Boolean = false
 
   assert(orderingFilled || (!orderingFilled && reverseOpt.isEmpty))
 
@@ -190,7 +192,7 @@ case class Mode(
     child match {
       case UnresolvedWithinGroup =>
         if (orderingWithinGroup.length != 1) {
-          throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+          throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
             nodeName, 1, orderingWithinGroup.length)
         }
         orderingWithinGroup.head match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
index 9c0502a2c1fc..453251ac61cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
@@ -20,9 +20,26 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
 trait SupportsOrderingWithinGroup { self: AggregateFunction =>
-  def orderingFilled: Boolean = false
   def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
+
+  /** Indicator that ordering was set. */
+  def orderingFilled: Boolean
+
+  /**
+   * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   */
+  def isOrderingMandatory: Boolean
+
+  /**
+   * Tells Analyzer that DISTINCT is supported.
+   * The DISTINCT can conflict with order so some functions can ban it.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   */
+  def isDistinctSupported: Boolean
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 3aaf353043a9..7789c23b50a4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -18,16 +18,22 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import scala.collection.mutable
-import scala.collection.mutable.Growable
+import scala.collection.mutable.{ArrayBuffer, Growable}
+import scala.util.{Left, Right}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
TypeUtils, UnsafeRowUtils}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType}
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
 import org.apache.spark.util.BoundedPriorityQueue
 
 /**
@@ -36,8 +42,7 @@ import org.apache.spark.util.BoundedPriorityQueue
  * We have to store all the collected elements in memory, and so notice that 
too many elements
  * can cause GC paused and eventually OutOfMemory Errors.
  */
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T]
-  with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends 
TypedImperativeAggregate[T] {
 
   val child: Expression
 
@@ -102,7 +107,8 @@ abstract class Collect[T <: Growable[Any] with 
Iterable[Any]] extends TypedImper
 case class CollectList(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] {
+    inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+  with UnaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
@@ -149,7 +155,7 @@ case class CollectSet(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
     inputAggBufferOffset: Int = 0)
-  extends Collect[mutable.HashSet[Any]] with QueryErrorsBase {
+  extends Collect[mutable.HashSet[Any]] with QueryErrorsBase with 
UnaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
@@ -215,7 +221,8 @@ case class CollectTopK(
     num: Int,
     reverse: Boolean = false,
     mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]] {
+    inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]]
+  with UnaryLike[Expression] {
   assert(num > 0)
 
   def this(child: Expression, num: Int) = this(child, num, false, 0, 0)
@@ -265,3 +272,280 @@ private[aggregate] object CollectTopK {
     case _ => throw QueryCompilationErrors.invalidNumParameter(e)
   }
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] 
[,...])] - Returns
+    the concatenation of non-null input values, separated by the delimiter 
ordered by key.
+    If all values are null, null is returned.
+    """,
+  arguments = """
+    Arguments:
+      * expr - a string or binary expression to be concatenated.
+      * delimiter - an optional string or binary foldable expression used to 
separate the input values.
+        If null, the concatenation will be performed without a delimiter. 
Default is null.
+      * key - an optional expression for ordering the input values. Multiple 
keys can be specified.
+        If none are specified, the order of the rows in the result is 
non-deterministic.
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       abc
+      > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), 
('b'), ('c') AS tab(col);
+       cba
+      > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col);
+       ab
+      > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+       aa
+      > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+       ab
+      > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+       a, b, c
+      > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+       NULL
+  """,
+  note = """
+    * If the order is not specified, the function is non-deterministic because
+    the order of the rows may be non-deterministic after a shuffle.
+    * If DISTINCT is specified, then expr and key must be the same expression.
+  """,
+  group = "agg_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ListAgg(
+    child: Expression,
+    delimiter: Expression = Literal(null),
+    orderExpressions: Seq[SortOrder] = Nil,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0)
+  extends Collect[mutable.ArrayBuffer[Any]]
+  with SupportsOrderingWithinGroup
+  with ImplicitCastInputTypes {
+
+  override def orderingFilled: Boolean = orderExpressions.nonEmpty
+
+  override def isOrderingMandatory: Boolean = false
+
+  override def isDistinctSupported: Boolean = true
+
+  override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction =
+    copy(orderExpressions = orderingWithinGroup)
+
+  override protected lazy val bufferElementType: DataType = {
+    if (!needSaveOrderValue) {
+      child.dataType
+    } else {
+      StructType(
+        StructField("value", child.dataType)
+        +: orderValuesField
+      )
+    }
+  }
+  /** Indicates that the result of [[child]] is not enough for evaluation  */
+  lazy val needSaveOrderValue: Boolean = !isOrderCompatible(orderExpressions)
+
+  def this(child: Expression) =
+    this(child, Literal(null), Nil, 0, 0)
+
+  def this(child: Expression, delimiter: Expression) =
+    this(child, delimiter, Nil, 0, 0)
+
+  override def nullable: Boolean = true
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def defaultResult: Option[Literal] = Option(Literal.create(null, 
dataType))
+
+  override def sql(isDistinct: Boolean): String = {
+    val distinct = if (isDistinct) "DISTINCT " else ""
+    val withinGroup = if (orderingFilled) {
+      s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})"
+    } else {
+      ""
+    }
+    s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup"
+  }
+
+  override def inputTypes: Seq[AbstractDataType] =
+    TypeCollection(
+      StringTypeWithCollation(supportsTrimCollation = true),
+      BinaryType
+    ) +:
+    TypeCollection(
+      StringTypeWithCollation(supportsTrimCollation = true),
+      BinaryType,
+      NullType
+    ) +:
+    orderExpressions.map(_ => AnyDataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val matchInputTypes = super.checkInputDataTypes()
+    if (matchInputTypes.isFailure) {
+      matchInputTypes
+    } else if (!delimiter.foldable) {
+      DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("delimiter"),
+          "inputType" -> toSQLType(delimiter.dataType),
+          "inputExpr" -> toSQLExpr(delimiter)
+        )
+      )
+    } else if (delimiter.dataType == NullType) {
+      // null is the default empty delimiter so type is not important
+      TypeCheckSuccess
+    } else {
+      TypeUtils.checkForSameTypeInputExpr(child.dataType :: delimiter.dataType 
:: Nil, prettyName)
+    }
+  }
+
+  override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+    if (buffer.nonEmpty) {
+      val sortedBufferWithoutNulls = sortBuffer(buffer)
+      concatSkippingNulls(sortedBufferWithoutNulls)
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Sort buffer according orderExpressions.
+   * If orderExpressions is empty then returns buffer as is.
+   * The format of buffer is determined by [[needSaveOrderValue]]
+   * @return sorted buffer containing only child's values
+   */
+  private[this] def sortBuffer(buffer: mutable.ArrayBuffer[Any]): 
mutable.ArrayBuffer[Any] = {
+    if (!orderingFilled) {
+      // without order return as is.
+      return buffer
+    }
+    if (!needSaveOrderValue) {
+      // Here the buffer has structure [childValue0, childValue1, ...]
+      // and we want to sort it by childValues
+      val sortOrderExpression = orderExpressions.head
+      val ascendingOrdering = 
PhysicalDataType.ordering(sortOrderExpression.dataType)
+      val ordering =
+        if (sortOrderExpression.direction == Ascending) ascendingOrdering
+        else ascendingOrdering.reverse
+      buffer.sorted(ordering)
+    } else {
+      // Here the buffer has structure
+      // [[childValue, orderValue0, orderValue1, ...],
+      //  [childValue, orderValue0, orderValue1, ...],
+      //  ...]
+      // and we want to sort it by tuples (orderValue0, orderValue1, ...)
+      buffer
+        .asInstanceOf[mutable.ArrayBuffer[InternalRow]]
+        .sorted(bufferOrdering)
+        // drop orderValues after sort
+        .map(_.get(0, child.dataType))
+    }
+  }
+
+  /**
+   * @return ordering by (orderValue0, orderValue1, ...)
+   *         for InternalRow with format [childValue, orderValue0, 
orderValue1, ...]
+   */
+  private[this] def bufferOrdering: Ordering[InternalRow] = {
+    val bufferSortOrder = orderExpressions.zipWithIndex.map {
+      case (originalOrder, i) =>
+        originalOrder.copy(
+          // first value is the evaluated child so add +1 for order's values
+          child = BoundReference(i + 1, originalOrder.dataType, 
originalOrder.child.nullable)
+        )
+    }
+    new InterpretedOrdering(bufferSortOrder)
+  }
+
+  private[this] def concatSkippingNulls(buffer: mutable.ArrayBuffer[Any]): Any 
= {
+    getDelimiterValue match {
+      case Right(delimiterValue: Array[Byte]) =>
+        val inputs = buffer.filter(_ != null).map(_.asInstanceOf[Array[Byte]])
+        ByteArray.concatWS(delimiterValue, inputs.toSeq: _*)
+      case Left(delimiterValue: UTF8String) =>
+        val inputs = buffer.filter(_ != null).map(_.asInstanceOf[UTF8String])
+        UTF8String.concatWs(delimiterValue, inputs.toSeq: _*)
+    }
+  }
+
+  /**
+   * @return delimiter value or default empty value if delimiter is null. Type 
respects [[dataType]]
+   */
+  private[this] def getDelimiterValue: Either[UTF8String, Array[Byte]] = {
+    val delimiterValue = delimiter.eval()
+    dataType match {
+      case _: StringType =>
+        Left(
+          if (delimiterValue == null) UTF8String.fromString("")
+          else delimiterValue.asInstanceOf[UTF8String]
+        )
+      case _: BinaryType =>
+        Right(
+          if (delimiterValue == null) ByteArray.EMPTY_BYTE
+          else delimiterValue.asInstanceOf[Array[Byte]]
+        )
+    }
+  }
+
+  override def dataType: DataType = child.dataType
+
+  override def update(buffer: ArrayBuffer[Any], input: InternalRow): 
ArrayBuffer[Any] = {
+    val value = child.eval(input)
+    if (value != null) {
+      val v = if (!needSaveOrderValue) {
+        convertToBufferElement(value)
+      } else {
+        InternalRow.fromSeq(convertToBufferElement(value) +: 
evalOrderValues(input))
+      }
+      buffer += v
+    }
+    buffer
+  }
+
+  private[this] def evalOrderValues(internalRow: InternalRow): Seq[Any] = {
+    orderExpressions.map(order => 
convertToBufferElement(order.child.eval(internalRow)))
+  }
+
+  override protected def convertToBufferElement(value: Any): Any = 
InternalRow.copyValue(value)
+
+  override def children: Seq[Expression] = child +: delimiter +: 
orderExpressions
+
+  /**
+   * Utility func to check if given order is defined and different from 
[[child]].
+   *
+   * @see [[QueryCompilationErrors.functionAndOrderExpressionMismatchError]]
+   * @see [[needSaveOrderValue]]
+   */
+  private[this] def isOrderCompatible(someOrder: Seq[SortOrder]): Boolean = {
+    if (someOrder.isEmpty) {
+      return true
+    }
+    if (someOrder.size == 1 && someOrder.head.child.semanticEquals(child)) {
+      return true
+    }
+    false
+  }
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[Expression]): Expression =
+    copy(
+      child = newChildren.head,
+      delimiter = newChildren(1),
+      orderExpressions = newChildren
+        .drop(2)
+        .map(_.asInstanceOf[SortOrder])
+    )
+
+  private[this] def orderValuesField: Seq[StructField] = {
+    orderExpressions.zipWithIndex.map {
+      case (order, i) => StructField(s"sortOrderValue[$i]", order.dataType)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 89a6984b8085..6dfa1b499df2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -378,7 +378,7 @@ case class PercentileCont(left: Expression, right: 
Expression, reverse: Boolean
 
   override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction = {
     if (orderingWithinGroup.length != 1) {
-      throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+      throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
         nodeName, 1, orderingWithinGroup.length)
     }
     orderingWithinGroup.head match {
@@ -390,6 +390,10 @@ case class PercentileCont(left: Expression, right: 
Expression, reverse: Boolean
   override protected def withNewChildrenInternal(
       newLeft: Expression, newRight: Expression): PercentileCont =
     this.copy(left = newLeft, right = newRight)
+
+  override def orderingFilled: Boolean = left != UnresolvedWithinGroup
+  override def isOrderingMandatory: Boolean = true
+  override def isDistinctSupported: Boolean = false
 }
 
 /**
@@ -432,7 +436,7 @@ case class PercentileDisc(
 
   override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction = {
     if (orderingWithinGroup.length != 1) {
-      throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+      throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
         nodeName, 1, orderingWithinGroup.length)
     }
     orderingWithinGroup.head match {
@@ -467,6 +471,10 @@ case class PercentileDisc(
       toDoubleValue(higherKey)
     }
   }
+
+  override def orderingFilled: Boolean = left != UnresolvedWithinGroup
+  override def isOrderingMandatory: Boolean = true
+  override def isDistinctSupported: Boolean = false
 }
 
 // scalastyle:off line.size.limit
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 03471ae8a3da..7d7a490c9790 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentif
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
FunctionAlreadyExistsException, NamespaceAlreadyExistsException, 
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, 
NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
InvalidUDFClassException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression, 
GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, 
WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression, 
GroupingID, NamedExpression, SortOrder, SpecifiedWindowFrame, WindowFrame, 
WindowFunction, WindowSpecDefinition}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, 
InputParameter, Join, LogicalPlan, SerdeInfo, Window}
@@ -725,28 +725,32 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "windowExpr" -> toSQLExpr(windowExpr)))
   }
 
-  def distinctInverseDistributionFunctionUnsupportedError(funcName: String): 
Throwable = {
+  def distinctWithOrderingFunctionUnsupportedError(funcName: String): 
Throwable = {
     new AnalysisException(
-      errorClass = 
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
-      messageParameters = Map("funcName" -> toSQLId(funcName)))
+      errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
+      messageParameters = Map("funcName" -> toSQLId(funcName))
+    )
   }
 
-  def inverseDistributionFunctionMissingWithinGroupError(funcName: String): 
Throwable = {
+  def functionMissingWithinGroupError(funcName: String): Throwable = {
     new AnalysisException(
-      errorClass = 
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
-      messageParameters = Map("funcName" -> toSQLId(funcName)))
+      errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
+      messageParameters = Map("funcName" -> toSQLId(funcName))
+    )
   }
 
-  def wrongNumOrderingsForInverseDistributionFunctionError(
+  def wrongNumOrderingsForFunctionError(
       funcName: String,
       validOrderingsNumber: Int,
       actualOrderingsNumber: Int): Throwable = {
     new AnalysisException(
-      errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+      errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
       messageParameters = Map(
         "funcName" -> toSQLId(funcName),
         "expectedNum" -> validOrderingsNumber.toString,
-        "actualNum" -> actualOrderingsNumber.toString))
+        "actualNum" -> actualOrderingsNumber.toString
+      )
+    )
   }
 
   def aliasNumberNotMatchColumnNumberError(
@@ -1049,6 +1053,18 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "operation" -> operation))
   }
 
+  def functionAndOrderExpressionMismatchError(
+      functionName: String,
+      functionArg: Expression,
+      orderExpr: Seq[SortOrder]): Throwable = {
+    new AnalysisException(
+      errorClass = 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+      messageParameters = Map(
+        "funcName" -> toSQLId(functionName),
+        "funcArg" -> toSQLExpr(functionArg),
+        "orderingExpr" -> orderExpr.map(order => 
toSQLExpr(order.child)).mkString(", ")))
+  }
+
   def wrongCommandForObjectTypeError(
       operation: String,
       requiredType: String,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c621c151c0bd..e77c050fe887 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -607,7 +607,12 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
             // [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed 
because those two distinct
             // aggregates have different column expressions.
             val distinctExpressions =
-              
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
+            functionsWithDistinct.head.aggregateFunction.children
+              .filterNot(_.foldable)
+              .map {
+                case s: SortOrder => s.child
+                case e => e
+              }
             val normalizedNamedDistinctExpressions = distinctExpressions.map { 
e =>
               // Ideally this should be done in `NormalizeFloatingNumbers`, 
but we do it here
               // because `distinctExpressions` is not extracted during logical 
phase.
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index c54e09735a9b..39cefdaa892b 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -425,6 +425,8 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis | kurtosis | 
SELECT kurtosis(col) FROM VALUES (-10), (-20), (100), (1000) AS tab(col) | 
struct<kurtosis(col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Last | last | SELECT 
last(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last(col):int> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Last | last_value | 
SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col) | 
struct<last_value(col):int> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | listagg | 
SELECT listagg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) | 
struct<listagg(col, NULL):string> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | string_agg | 
SELECT string_agg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) | 
struct<string_agg(col, NULL):string> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Max | max | SELECT 
max(col) FROM VALUES (10), (50), (20) AS tab(col) | struct<max(col):int> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy | max_by | SELECT 
max_by(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y) | 
struct<max_by(x, y):string> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Median | median | SELECT 
median(col) FROM VALUES (0), (10) AS tab(col) | struct<median(col):double> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
new file mode 100644
index 000000000000..ca471858a541
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
@@ -0,0 +1,86 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, utf8_binary) ASC NULLS FIRST, 0, 
0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_binary) ASC 
NULLS FIRST)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, utf8_lcase) ASC NULLS FIRST, 0, 
0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC 
NULLS FIRST)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_binary), null, 0, 0) AS 
listagg(DISTINCT collate(c1, utf8_binary), NULL)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, 0, 0) AS 
listagg(DISTINCT collate(c1, utf8_lcase), NULL)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, collate(c1#x, 
utf8_lcase) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT collate(c1, utf8_lcase), 
NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC NULLS FIRST)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc  '), 
('abc '), ('x'), ('abc')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, unicode_rtrim), null, 0, 0) AS 
listagg(DISTINCT collate(c1, unicode_rtrim), NULL)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc  '), ('abc 
'), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, c1#x ASC NULLS FIRST, 0, 0) AS listagg(c1, 
NULL) WITHIN GROUP (ORDER BY c1 ASC NULLS FIRST)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM 
(VALUES ('abc  '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, unicode_rtrim) ASC NULLS FIRST, 
0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, unicode_rtrim) 
ASC NULLS FIRST)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"collate(c1, utf8_lcase)\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"collate(c1, utf8_binary)\""
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out
new file mode 100644
index 000000000000..71eb3f8ca76b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out
@@ -0,0 +1,435 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query analysis
+CreateViewCommand `df`, SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 
'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b), false, false, LocalTempView, 
UNSUPPORTED, true
+   +- Project [a#x, b#x]
+      +- SubqueryAlias t
+         +- Project [col1#x AS a#x, col2#x AS b#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query analysis
+CreateViewCommand `df2`, SELECT * FROM (VALUES (1, true), (2, false), (3, 
false)) AS t(a, b), false, false, LocalTempView, UNSUPPORTED, true
+   +- Project [a#x, b#x]
+      +- SubqueryAlias t
+         +- Project [col1#x AS a#x, col2#x AS b#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [string_agg(b#x, null, 0, 0) AS string_agg(b, NULL)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query analysis
+Aggregate [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- Filter NOT (1 = 1)
+   +- SubqueryAlias df
+      +- View (`df`, [a#x, b#x])
+         +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+            +- Project [a#x, b#x]
+               +- SubqueryAlias t
+                  +- Project [col1#x AS a#x, col2#x AS b#x]
+                     +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, |, 0, 0) AS listagg(b, |)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, 0, 0) AS listagg(a, NULL)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query analysis
+Aggregate [listagg(distinct a#x, null, 0, 0) AS listagg(DISTINCT a, NULL)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, a#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL) 
WITHIN GROUP (ORDER BY a ASC NULLS FIRST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL) 
WITHIN GROUP (ORDER BY a DESC NULLS LAST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query analysis
+Project [listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER 
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x]
++- Project [a#x, b#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS 
LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER 
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x]
+   +- Window [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0) 
windowspecdefinition(b#x, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
unboundedfollowing$())) AS listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS 
LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING)#x], [b#x]
+      +- Project [a#x, b#x]
+         +- SubqueryAlias df
+            +- View (`df`, [a#x, b#x])
+               +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS 
b#x]
+                  +- Project [a#x, b#x]
+                     +- SubqueryAlias t
+                        +- Project [col1#x AS a#x, col2#x AS b#x]
+                           +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL) 
WITHIN GROUP (ORDER BY b ASC NULLS FIRST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL) 
WITHIN GROUP (ORDER BY b DESC NULLS LAST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, |, b#x DESC NULLS LAST, 0, 0) AS listagg(a, |) WITHIN 
GROUP (ORDER BY b DESC NULLS LAST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x ASC NULLS FIRST, 0, 0) 
AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a ASC NULLS 
FIRST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x DESC NULLS LAST, 0, 0) 
AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a DESC NULLS 
LAST)#x]
++- SubqueryAlias df
+   +- View (`df`, [a#x, b#x])
+      +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, 0x42, 0, 0) AS listagg(c1, X'42')#x]
++- SubqueryAlias t
+   +- Project [col1#x AS c1#x]
+      +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query analysis
+Aggregate [listagg(cast(a#x as string), null, 0, 0) AS listagg(a, NULL)#x, 
listagg(cast(b#x as string), ,, 0, 0) AS listagg(b, ,)#x]
++- SubqueryAlias df2
+   +- View (`df2`, [a#x, b#x])
+      +- Project [cast(a#x as int) AS a#x, cast(b#x as boolean) AS b#x]
+         +- Project [a#x, b#x]
+            +- SubqueryAlias t
+               +- Project [col1#x AS a#x, col2#x AS b#x]
+                  +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c1\"",
+    "inputType" : "\"ARRAY<STRING>\"",
+    "paramIndex" : "first",
+    "requiredType" : "(\"STRING\" or \"BINARY\")",
+    "sqlExpr" : "\"listagg(c1, NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 18,
+    "fragment" : "listagg(c1)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "dataType" : "(\"BINARY\" or \"STRING\")",
+    "functionName" : "`listagg`",
+    "sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"a\"",
+    "inputName" : "`delimiter`",
+    "inputType" : "\"STRING\"",
+    "sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 20,
+    "fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query analysis
+Project [listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)#x]
++- Project [a#x, listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x, listagg(a, NULL) OVER (ORDER BY 
a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x]
+   +- Window [listagg(a#x, null, 0, 0) windowspecdefinition(a#x ASC NULLS 
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) 
AS listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW)#x], [a#x ASC NULLS FIRST]
+      +- Project [a#x]
+         +- SubqueryAlias df
+            +- View (`df`, [a#x, b#x])
+               +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS 
b#x]
+                  +- Project [a#x, b#x]
+                     +- SubqueryAlias t
+                        +- Project [col1#x AS a#x, col2#x AS b#x]
+                           +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 61,
+    "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 64,
+    "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"a\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"b\""
+  }
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"a\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"a\", \"b\""
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
index d6ecbc72a717..8028c344140f 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
@@ -74,7 +74,7 @@ SELECT department, mode(DISTINCT salary) FROM basic_pays 
GROUP BY department ORD
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -379,7 +379,7 @@ FROM basic_pays
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -401,7 +401,7 @@ FROM basic_pays
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -423,7 +423,7 @@ FROM basic_pays
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "actualNum" : "1",
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
index 4a31cff8c7d0..eb8102afa47e 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
@@ -248,7 +248,7 @@ FROM aggr
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -270,7 +270,7 @@ FROM aggr
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -342,7 +342,7 @@ FROM aggr
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -364,7 +364,7 @@ FROM aggr
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -386,7 +386,7 @@ FROM aggr
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "actualNum" : "2",
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql 
b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql
new file mode 100644
index 000000000000..35f86183c37b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql
@@ -0,0 +1,12 @@
+-- Test cases with collations
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc  '), 
('abc '), ('x'), ('abc')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc  '), ('abc 
'), ('abc\n'), ('abc'), ('x')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM 
(VALUES ('abc  '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1);
+
+-- Error case with collations
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1);
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/inputs/listagg.sql 
b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql
new file mode 100644
index 000000000000..15c8cfa823e9
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql
@@ -0,0 +1,38 @@
+-- Create temporary views
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b);
+
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b);
+
+-- Test cases for listagg function
+SELECT listagg(b) FROM df GROUP BY a;
+SELECT string_agg(b) FROM df GROUP BY a;
+SELECT listagg(b, NULL) FROM df GROUP BY a;
+SELECT listagg(b) FROM df WHERE 1 != 1;
+SELECT listagg(b, '|') FROM df GROUP BY a;
+SELECT listagg(a) FROM df;
+SELECT listagg(DISTINCT a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df;
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df;
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(a), listagg(b, ',') FROM df2;
+
+-- Error cases
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1);
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(b, a) FROM df GROUP BY a;
+SELECT listagg(a) OVER (ORDER BY a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df;
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df;
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df;
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df;
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df;
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out 
b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out
new file mode 100644
index 000000000000..cf3bac04f09c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out
@@ -0,0 +1,82 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_binary) ASC 
NULLS FIRST):string>
+-- !query output
+ABab
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES 
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC 
NULLS FIRST):string>
+-- !query output
+aAbB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_binary), NULL):string>
+-- !query output
+aAbB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'), 
('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_lcase), NULL):string collate 
UTF8_LCASE>
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_lcase), NULL) WITHIN GROUP (ORDER BY 
collate(c1, utf8_lcase) ASC NULLS FIRST):string collate UTF8_LCASE>
+-- !query output
+aB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc  '), 
('abc '), ('x'), ('abc')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, unicode_rtrim), NULL):string collate 
UNICODE_RTRIM>
+-- !query output
+abc  x
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc  '), ('abc 
'), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY c1 ASC NULLS FIRST):string>
+-- !query output
+abcabc
+abc abc  x
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM 
(VALUES ('abc  '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, unicode_rtrim) ASC 
NULLS FIRST):string>
+-- !query output
+abc  abc abcabc
+x
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1 
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"collate(c1, utf8_lcase)\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"collate(c1, utf8_binary)\""
+  }
+}
diff --git a/sql/core/src/test/resources/sql-tests/results/listagg.sql.out 
b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out
new file mode 100644
index 000000000000..ef580704992c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out
@@ -0,0 +1,368 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct<string_agg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, |):string>
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct<listagg(a, NULL):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct<listagg(DISTINCT a, NULL):string>
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a ASC NULLS FIRST):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST):string>
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER 
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING):string>
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b ASC NULLS FIRST):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST):string>
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct<listagg(a, |) WITHIN GROUP (ORDER BY b DESC NULLS LAST):string>
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a ASC NULLS 
FIRST):string>
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a DESC NULLS 
LAST):string>
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL):binary>
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL):binary>
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, X'42'):binary>
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct<listagg(a, NULL):string,listagg(b, ,):string>
+-- !query output
+123    true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"c1\"",
+    "inputType" : "\"ARRAY<STRING>\"",
+    "paramIndex" : "first",
+    "requiredType" : "(\"STRING\" or \"BINARY\")",
+    "sqlExpr" : "\"listagg(c1, NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 18,
+    "fragment" : "listagg(c1)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "dataType" : "(\"BINARY\" or \"STRING\")",
+    "functionName" : "`listagg`",
+    "sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"a\"",
+    "inputName" : "`delimiter`",
+    "inputType" : "\"STRING\"",
+    "sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 20,
+    "fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW):string>
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 61,
+    "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 64,
+    "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 44,
+    "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"a\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"b\""
+  }
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+  "sqlState" : "42K0K",
+  "messageParameters" : {
+    "funcArg" : "\"a\"",
+    "funcName" : "`listagg`",
+    "orderingExpr" : "\"a\", \"b\""
+  }
+}
diff --git a/sql/core/src/test/resources/sql-tests/results/mode.sql.out 
b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
index ad7d59eeb163..70f253066d4f 100644
--- a/sql/core/src/test/resources/sql-tests/results/mode.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
@@ -51,7 +51,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -373,7 +373,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -397,7 +397,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`mode`"
@@ -421,7 +421,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "actualNum" : "1",
diff --git a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out 
b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
index cd95eee186e1..55aaa8ee7378 100644
--- a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
@@ -222,7 +222,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -246,7 +246,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -324,7 +324,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -348,7 +348,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "funcName" : "`percentile_cont`"
@@ -372,7 +372,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
   "sqlState" : "42K0K",
   "messageParameters" : {
     "actualNum" : "2",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6348e5f31539..ad80dc65926b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -621,6 +621,41 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("listagg function") {
+    // normal case
+    val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("listagg(a)", "listagg(b)"),
+      Seq(Row("abc", "bcd"))
+    )
+    checkAnswer(
+      df.select(listagg($"a"), listagg($"b")),
+      Seq(Row("abc", "bcd"))
+    )
+
+    // distinct case
+    val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+    checkAnswer(
+      df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+      Seq(Row("ab", "bd"))
+    )
+
+    // null case
+    val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null, 
null)).toDF("a", "b", "c")
+    checkAnswer(
+      df3.select(listagg_distinct($"a"), listagg($"a"), 
listagg_distinct($"b"), listagg($"b"),
+        listagg($"c")),
+      Seq(Row("a", "aa", "b", "bb", null))
+    )
+
+    // custom delimiter
+    val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+    checkAnswer(
+      df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+      Seq(Row("a|b|c", "b|c|d"))
+    )
+  }
+
   test("SPARK-31500: collect_set() of BinaryType returns duplicate elements") {
     val bytesTest1 = "test1".getBytes
     val bytesTest2 = "test2".getBytes
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 975a82e26f4e..4494057b1eef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -73,7 +73,9 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
       "sum_distinct", // equivalent to sum(distinct foo)
       "typedLit", "typedlit", // Scala only
       "udaf", "udf", // create function statement in sql
-      "call_function" // moot in SQL as you just call the function directly
+      "call_function", // moot in SQL as you just call the function directly
+      "listagg_distinct", // equivalent to listagg(distinct foo)
+      "string_agg_distinct" // equivalent to string_agg(distinct foo)
     )
 
     val excludedSqlFunctions = Set.empty[String]
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
index 662f43fc0039..283454ad273e 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
@@ -104,6 +104,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite 
with SharedThriftServ
     "timestampNTZ/datetime-special-ansi.sql",
     // SPARK-47264
     "collations.sql",
+    "listagg-collations.sql",
     "pipe-operators.sql",
     // VARIANT type
     "variant/named-function-arguments.sql"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to