This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4b97e11b53c2 [SPARK-42746][SQL] Implement LISTAGG function
4b97e11b53c2 is described below
commit 4b97e11b53c22791f186f514694d0bddab92ab64
Author: Mikhail Nikoliukin <[email protected]>
AuthorDate: Fri Nov 29 21:32:20 2024 +0800
[SPARK-42746][SQL] Implement LISTAGG function
### What changes were proposed in this pull request?
Implement new aggregation function `listagg([ALL | DISTINCT] expr[, sep])
[WITHIN GROUP (ORDER BY key [ASC | DESC] [,...])]`
### Why are the changes needed?
Listagg is a popular function implemented by many other vendors. For now,
users have to use workarounds like
[this](https://kb.databricks.com/sql/recreate-listagg-functionality-with-spark-sql).
PR will close the gap.
### Does this PR introduce _any_ user-facing change?
Yes, the new `listagg` function. BigQuery and PostgreSQL have the same
function but with `string_agg` name so I added it as an alias.
### How was this patch tested?
With new unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: GitHub Copilot
Closes #48748 from mikhailnik-db/SPARK-42746-add-listagg.
Lead-authored-by: Mikhail Nikoliukin <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/unsafe/types/ByteArray.java | 36 +-
.../apache/spark/unsafe/array/ByteArraySuite.java | 55 +++
.../src/main/resources/error/error-conditions.json | 51 +--
python/pyspark/sql/tests/test_functions.py | 8 +-
.../scala/org/apache/spark/sql/functions.scala | 71 ++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 3 +
.../sql/catalyst/analysis/CheckAnalysis.scala | 15 +-
.../sql/catalyst/analysis/FunctionRegistry.scala | 2 +
.../sql/catalyst/analysis/FunctionResolution.scala | 17 +-
.../sql/catalyst/expressions/aggregate/Mode.scala | 4 +-
.../aggregate/SupportsOrderingWithinGroup.scala | 21 +-
.../catalyst/expressions/aggregate/collect.scala | 298 +++++++++++++-
.../expressions/aggregate/percentiles.scala | 12 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 36 +-
.../spark/sql/execution/SparkStrategies.scala | 7 +-
.../sql-functions/sql-expression-schema.md | 2 +
.../analyzer-results/listagg-collations.sql.out | 86 ++++
.../sql-tests/analyzer-results/listagg.sql.out | 435 +++++++++++++++++++++
.../sql-tests/analyzer-results/mode.sql.out | 8 +-
.../sql-tests/analyzer-results/percentiles.sql.out | 10 +-
.../sql-tests/inputs/listagg-collations.sql | 12 +
.../test/resources/sql-tests/inputs/listagg.sql | 38 ++
.../sql-tests/results/listagg-collations.sql.out | 82 ++++
.../resources/sql-tests/results/listagg.sql.out | 368 +++++++++++++++++
.../test/resources/sql-tests/results/mode.sql.out | 8 +-
.../sql-tests/results/percentiles.sql.out | 10 +-
.../apache/spark/sql/DataFrameAggregateSuite.scala | 35 ++
.../apache/spark/sql/DataFrameFunctionsSuite.scala | 4 +-
.../thriftserver/ThriftServerQueryTestSuite.scala | 1 +
29 files changed, 1655 insertions(+), 80 deletions(-)
diff --git
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
index aae47aa96320..f12408fb4931 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
@@ -135,27 +135,57 @@ public final class ByteArray {
return Arrays.copyOfRange(bytes, start, end);
}
+ /**
+ * Concatenate multiple byte arrays into one.
+ * If one of the inputs is null then null will be returned.
+ *
+ * @param inputs byte arrays to concatenate
+ * @return the concatenated byte array or null if one of the arguments is
null
+ */
public static byte[] concat(byte[]... inputs) {
+ return concatWS(EMPTY_BYTE, inputs);
+ }
+
+ /**
+ * Concatenate multiple byte arrays with a given delimiter.
+ * If the delimiter or one of the inputs is null then null will be returned.
+ *
+ * @param delimiter byte array to be placed between each input
+ * @param inputs byte arrays to concatenate
+ * @return the concatenated byte array or null if one of the arguments is
null
+ */
+ public static byte[] concatWS(byte[] delimiter, byte[]... inputs) {
+ if (delimiter == null) {
+ return null;
+ }
// Compute the total length of the result
long totalLength = 0;
for (byte[] input : inputs) {
if (input != null) {
- totalLength += input.length;
+ totalLength += input.length + delimiter.length;
} else {
return null;
}
}
-
+ if (totalLength > 0) totalLength -= delimiter.length;
// Allocate a new byte array, and copy the inputs one by one into it
final byte[] result = new byte[Ints.checkedCast(totalLength)];
int offset = 0;
- for (byte[] input : inputs) {
+ for (int i = 0; i < inputs.length; i++) {
+ byte[] input = inputs[i];
int len = input.length;
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET,
result, Platform.BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
+ if (delimiter.length > 0 && i < inputs.length - 1) {
+ Platform.copyMemory(
+ delimiter, Platform.BYTE_ARRAY_OFFSET,
+ result, Platform.BYTE_ARRAY_OFFSET + offset,
+ delimiter.length);
+ offset += delimiter.length;
+ }
}
return result;
}
diff --git
a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
index aff619175ff7..5e221b4e359d 100644
---
a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
+++
b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/ByteArraySuite.java
@@ -67,4 +67,59 @@ public class ByteArraySuite {
byte[] y4 = new byte[]{(byte) 100, (byte) 200};
Assertions.assertEquals(0, ByteArray.compareBinary(x4, y4));
}
+
+ @Test
+ public void testConcat() {
+ byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+ byte[] result1 = ByteArray.concat(x1, y1);
+ byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 4,
(byte) 5, (byte) 6};
+ Assertions.assertArrayEquals(expected1, result1);
+
+ byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y2 = new byte[0];
+ byte[] result2 = ByteArray.concat(x2, y2);
+ byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ Assertions.assertArrayEquals(expected2, result2);
+
+ byte[] x3 = new byte[0];
+ byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+ byte[] result3 = ByteArray.concat(x3, y3);
+ byte[] expected3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+ Assertions.assertArrayEquals(expected3, result3);
+
+ byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y4 = null;
+ byte[] result4 = ByteArray.concat(x4, y4);
+ Assertions.assertArrayEquals(null, result4);
+ }
+
+ @Test
+ public void testConcatWS() {
+ byte[] separator = new byte[]{(byte) 42};
+
+ byte[] x1 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y1 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+ byte[] result1 = ByteArray.concatWS(separator, x1, y1);
+ byte[] expected1 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42,
+ (byte) 4, (byte) 5, (byte) 6};
+ Assertions.assertArrayEquals(expected1, result1);
+
+ byte[] x2 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y2 = new byte[0];
+ byte[] result2 = ByteArray.concatWS(separator, x2, y2);
+ byte[] expected2 = new byte[]{(byte) 1, (byte) 2, (byte) 3, (byte) 42};
+ Assertions.assertArrayEquals(expected2, result2);
+
+ byte[] x3 = new byte[0];
+ byte[] y3 = new byte[]{(byte) 4, (byte) 5, (byte) 6};
+ byte[] result3 = ByteArray.concatWS(separator, x3, y3);
+ byte[] expected3 = new byte[]{(byte) 42, (byte) 4, (byte) 5, (byte) 6};
+ Assertions.assertArrayEquals(expected3, result3);
+
+ byte[] x4 = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+ byte[] y4 = null;
+ byte[] result4 = ByteArray.concatWS(separator, x4, y4);
+ Assertions.assertArrayEquals(null, result4);
+ }
}
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 77437f6c5617..024caf86cf94 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2627,29 +2627,6 @@
],
"sqlState" : "22006"
},
- "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : {
- "message" : [
- "Invalid inverse distribution function <funcName>."
- ],
- "subClass" : {
- "DISTINCT_UNSUPPORTED" : {
- "message" : [
- "Cannot use DISTINCT with WITHIN GROUP."
- ]
- },
- "WITHIN_GROUP_MISSING" : {
- "message" : [
- "WITHIN GROUP is required for inverse distribution function."
- ]
- },
- "WRONG_NUM_ORDERINGS" : {
- "message" : [
- "Requires <expectedNum> orderings in WITHIN GROUP but got
<actualNum>."
- ]
- }
- },
- "sqlState" : "42K0K"
- },
"INVALID_JAVA_IDENTIFIER_AS_FIELD_NAME" : {
"message" : [
"<fieldName> is not a valid identifier of Java and cannot be used as
field name",
@@ -3364,6 +3341,34 @@
],
"sqlState" : "42601"
},
+ "INVALID_WITHIN_GROUP_EXPRESSION" : {
+ "message" : [
+ "Invalid function <funcName> with WITHIN GROUP."
+ ],
+ "subClass" : {
+ "DISTINCT_UNSUPPORTED" : {
+ "message" : [
+ "The function does not support DISTINCT with WITHIN GROUP."
+ ]
+ },
+ "MISMATCH_WITH_DISTINCT_INPUT" : {
+ "message" : [
+ "The function is invoked with DISTINCT and WITHIN GROUP but
expressions <funcArg> and <orderingExpr> do not match. The WITHIN GROUP
ordering expression must be picked from the function inputs."
+ ]
+ },
+ "WITHIN_GROUP_MISSING" : {
+ "message" : [
+ "WITHIN GROUP is required for the function."
+ ]
+ },
+ "WRONG_NUM_ORDERINGS" : {
+ "message" : [
+ "The function requires <expectedNum> orderings in WITHIN GROUP but
got <actualNum>."
+ ]
+ }
+ },
+ "sqlState" : "42K0K"
+ },
"INVALID_WRITER_COMMIT_MESSAGE" : {
"message" : [
"The data source writer has generated an invalid number of commit
messages. Expected exactly one writer commit message from each task, but
received <detail>."
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index cf8f685ea449..6c7ce8007292 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -83,7 +83,13 @@ class FunctionsTestsMixin:
missing_in_py = jvm_fn_set.difference(py_fn_set)
# Functions that we expect to be missing in python until they are
added to pyspark
- expected_missing_in_py = set()
+ expected_missing_in_py = {
+ # TODO(SPARK-50220): listagg functions will soon be added and
removed from this list
+ "listagg_distinct",
+ "listagg",
+ "string_agg",
+ "string_agg_distinct",
+ }
self.assertEqual(
expected_missing_in_py, missing_in_py, "Missing functions in
pyspark not as expected"
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 2a04212ee258..9f509fa843a2 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1147,6 +1147,77 @@ object functions {
*/
def sum_distinct(e: Column): Column = Column.fn("sum", isDistinct = true, e)
+ /**
+ * Aggregate function: returns the concatenation of non-null input values.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def listagg(e: Column): Column = Column.fn("listagg", e)
+
+ /**
+ * Aggregate function: returns the concatenation of non-null input values,
separated by the
+ * delimiter.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def listagg(e: Column, delimiter: Column): Column = Column.fn("listagg", e,
delimiter)
+
+ /**
+ * Aggregate function: returns the concatenation of distinct non-null input
values.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def listagg_distinct(e: Column): Column = Column.fn("listagg", isDistinct =
true, e)
+
+ /**
+ * Aggregate function: returns the concatenation of distinct non-null input
values, separated by
+ * the delimiter.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def listagg_distinct(e: Column, delimiter: Column): Column =
+ Column.fn("listagg", isDistinct = true, e, delimiter)
+
+ /**
+ * Aggregate function: returns the concatenation of non-null input values.
Alias for `listagg`.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def string_agg(e: Column): Column = Column.fn("string_agg", e)
+
+ /**
+ * Aggregate function: returns the concatenation of non-null input values,
separated by the
+ * delimiter. Alias for `listagg`.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def string_agg(e: Column, delimiter: Column): Column =
Column.fn("string_agg", e, delimiter)
+
+ /**
+ * Aggregate function: returns the concatenation of distinct non-null input
values. Alias for
+ * `listagg`.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def string_agg_distinct(e: Column): Column = Column.fn("string_agg",
isDistinct = true, e)
+
+ /**
+ * Aggregate function: returns the concatenation of distinct non-null input
values, separated by
+ * the delimiter. Alias for `listagg`.
+ *
+ * @group agg_funcs
+ * @since 4.0.0
+ */
+ def string_agg_distinct(e: Column, delimiter: Column): Column =
+ Column.fn("string_agg", isDistinct = true, e, delimiter)
+
/**
* Aggregate function: alias for `var_samp`.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8e1b9da927c9..3af3565220bd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2772,6 +2772,9 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
ne
case e: Expression if e.foldable =>
e // No need to create an attribute reference if it will be
evaluated as a Literal.
+ case e: SortOrder =>
+ // For SortOder just recursively extract the from child expression.
+ e.copy(child = extractExpr(e.child))
case e: NamedArgumentExpression =>
// For NamedArgumentExpression, we extract the value and replace it
with
// an AttributeReference (with an internal column name, e.g. "_w0").
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 573619af1b5f..1c76fd7d00f7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
-import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
AggregateFunction, Median, PercentileCont, PercentileDisc}
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification,
DecorrelateInnerQuery, InlineCTE}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -423,10 +423,23 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
"funcName" -> toSQLExpr(wf),
"windowExpr" -> toSQLExpr(w)))
+ case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _)
+ if agg.isDistinct && listAgg.needSaveOrderValue =>
+ throw
QueryCompilationErrors.functionAndOrderExpressionMismatchError(
+ listAgg.prettyName, listAgg.child, listAgg.orderExpressions)
+
case w: WindowExpression =>
// Only allow window functions with an aggregate expression or an
offset window
// function or a Pandas window UDF.
w.windowFunction match {
+ case agg @ AggregateExpression(fun: ListAgg, _, _, _, _)
+ // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY
...) is unsupported
+ if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty ||
+ w.windowSpec.frameSpecification !=
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
UnboundedFollowing)) =>
+ agg.failAnalysis(
+ errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+ messageParameters = Map("aggFunc" ->
toSQLExpr(agg.aggregateFunction)))
case agg @ AggregateExpression(
_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _)
if w.windowSpec.orderSpec.nonEmpty ||
w.windowSpec.frameSpecification !=
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 5103f8048856..d9e9f49ce065 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -506,6 +506,8 @@ object FunctionRegistry {
expression[CollectList]("collect_list"),
expression[CollectList]("array_agg", true, Some("3.3.0")),
expression[CollectSet]("collect_set"),
+ expression[ListAgg]("listagg"),
+ expression[ListAgg]("string_agg", setAlias = true),
expressionBuilder("count_min_sketch", CountMinSketchAggExpressionBuilder),
expression[BoolAnd]("every", true),
expression[BoolAnd]("bool_and"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index 5a27a7219032..800126e0030e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -128,18 +128,15 @@ class FunctionResolution(
numArgs: Int,
u: UnresolvedFunction): Expression = {
func match {
- case owg: SupportsOrderingWithinGroup if u.isDistinct =>
- throw
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
- owg.prettyName
- )
+ case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported &&
u.isDistinct =>
+ throw
QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError(owg.prettyName)
case owg: SupportsOrderingWithinGroup
- if !owg.orderingFilled && u.orderingWithinGroup.isEmpty =>
- throw
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
- owg.prettyName
- )
+ if owg.isOrderingMandatory && !owg.orderingFilled &&
u.orderingWithinGroup.isEmpty =>
+ throw
QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName)
case owg: SupportsOrderingWithinGroup
if owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
- throw
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+ // e.g mode(expr1) within group (order by expr2) is not supported
+ throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
owg.prettyName,
0,
u.orderingWithinGroup.length
@@ -198,7 +195,7 @@ class FunctionResolution(
case agg: AggregateFunction =>
// Note: PythonUDAF does not support these advanced clauses.
if (agg.isInstanceOf[PythonUDAF]) checkUnsupportedAggregateClause(agg,
u)
- // After parse, the inverse distribution functions not set the
ordering within group yet.
+ // After parse, the functions not set the ordering within group yet.
val newAgg = agg match {
case owg: SupportsOrderingWithinGroup
if !owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index 97add0b8e45b..f3eeaa96b3d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -183,6 +183,8 @@ case class Mode(
}
override def orderingFilled: Boolean = child != UnresolvedWithinGroup
+ override def isOrderingMandatory: Boolean = true
+ override def isDistinctSupported: Boolean = false
assert(orderingFilled || (!orderingFilled && reverseOpt.isEmpty))
@@ -190,7 +192,7 @@ case class Mode(
child match {
case UnresolvedWithinGroup =>
if (orderingWithinGroup.length != 1) {
- throw
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+ throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
index 9c0502a2c1fc..453251ac61cd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
@@ -20,9 +20,26 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.expressions.SortOrder
/**
- * The trait used to set the [[SortOrder]] after inverse distribution
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
*/
trait SupportsOrderingWithinGroup { self: AggregateFunction =>
- def orderingFilled: Boolean = false
def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]):
AggregateFunction
+
+ /** Indicator that ordering was set. */
+ def orderingFilled: Boolean
+
+ /**
+ * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function.
+ *
+ * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+ */
+ def isOrderingMandatory: Boolean
+
+ /**
+ * Tells Analyzer that DISTINCT is supported.
+ * The DISTINCT can conflict with order so some functions can ban it.
+ *
+ * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+ */
+ def isDistinctSupported: Boolean
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 3aaf353043a9..7789c23b50a4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -18,16 +18,22 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import scala.collection.mutable
-import scala.collection.mutable.Growable
+import scala.collection.mutable.{ArrayBuffer, Growable}
+import scala.util.{Left, Right}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch,
TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData,
TypeUtils, UnsafeRowUtils}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType}
+import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
import org.apache.spark.util.BoundedPriorityQueue
/**
@@ -36,8 +42,7 @@ import org.apache.spark.util.BoundedPriorityQueue
* We have to store all the collected elements in memory, and so notice that
too many elements
* can cause GC paused and eventually OutOfMemory Errors.
*/
-abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends
TypedImperativeAggregate[T]
- with UnaryLike[Expression] {
+abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends
TypedImperativeAggregate[T] {
val child: Expression
@@ -102,7 +107,8 @@ abstract class Collect[T <: Growable[Any] with
Iterable[Any]] extends TypedImper
case class CollectList(
child: Expression,
mutableAggBufferOffset: Int = 0,
- inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] {
+ inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]]
+ with UnaryLike[Expression] {
def this(child: Expression) = this(child, 0, 0)
@@ -149,7 +155,7 @@ case class CollectSet(
child: Expression,
mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0)
- extends Collect[mutable.HashSet[Any]] with QueryErrorsBase {
+ extends Collect[mutable.HashSet[Any]] with QueryErrorsBase with
UnaryLike[Expression] {
def this(child: Expression) = this(child, 0, 0)
@@ -215,7 +221,8 @@ case class CollectTopK(
num: Int,
reverse: Boolean = false,
mutableAggBufferOffset: Int = 0,
- inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]] {
+ inputAggBufferOffset: Int = 0) extends Collect[BoundedPriorityQueue[Any]]
+ with UnaryLike[Expression] {
assert(num > 0)
def this(child: Expression, num: Int) = this(child, num, false, 0, 0)
@@ -265,3 +272,280 @@ private[aggregate] object CollectTopK {
case _ => throw QueryCompilationErrors.invalidNumParameter(e)
}
}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC]
[,...])] - Returns
+ the concatenation of non-null input values, separated by the delimiter
ordered by key.
+ If all values are null, null is returned.
+ """,
+ arguments = """
+ Arguments:
+ * expr - a string or binary expression to be concatenated.
+ * delimiter - an optional string or binary foldable expression used to
separate the input values.
+ If null, the concatenation will be performed without a delimiter.
Default is null.
+ * key - an optional expression for ordering the input values. Multiple
keys can be specified.
+ If none are specified, the order of the rows in the result is
non-deterministic.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+ abc
+ > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'),
('b'), ('c') AS tab(col);
+ cba
+ > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col);
+ ab
+ > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+ aa
+ > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS
tab(col);
+ ab
+ > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+ a, b, c
+ > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+ NULL
+ """,
+ note = """
+ * If the order is not specified, the function is non-deterministic because
+ the order of the rows may be non-deterministic after a shuffle.
+ * If DISTINCT is specified, then expr and key must be the same expression.
+ """,
+ group = "agg_funcs",
+ since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ListAgg(
+ child: Expression,
+ delimiter: Expression = Literal(null),
+ orderExpressions: Seq[SortOrder] = Nil,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends Collect[mutable.ArrayBuffer[Any]]
+ with SupportsOrderingWithinGroup
+ with ImplicitCastInputTypes {
+
+ override def orderingFilled: Boolean = orderExpressions.nonEmpty
+
+ override def isOrderingMandatory: Boolean = false
+
+ override def isDistinctSupported: Boolean = true
+
+ override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]):
AggregateFunction =
+ copy(orderExpressions = orderingWithinGroup)
+
+ override protected lazy val bufferElementType: DataType = {
+ if (!needSaveOrderValue) {
+ child.dataType
+ } else {
+ StructType(
+ StructField("value", child.dataType)
+ +: orderValuesField
+ )
+ }
+ }
+ /** Indicates that the result of [[child]] is not enough for evaluation */
+ lazy val needSaveOrderValue: Boolean = !isOrderCompatible(orderExpressions)
+
+ def this(child: Expression) =
+ this(child, Literal(null), Nil, 0, 0)
+
+ def this(child: Expression, delimiter: Expression) =
+ this(child, delimiter, Nil, 0, 0)
+
+ override def nullable: Boolean = true
+
+ override def createAggregationBuffer(): mutable.ArrayBuffer[Any] =
mutable.ArrayBuffer.empty
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def defaultResult: Option[Literal] = Option(Literal.create(null,
dataType))
+
+ override def sql(isDistinct: Boolean): String = {
+ val distinct = if (isDistinct) "DISTINCT " else ""
+ val withinGroup = if (orderingFilled) {
+ s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})"
+ } else {
+ ""
+ }
+ s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup"
+ }
+
+ override def inputTypes: Seq[AbstractDataType] =
+ TypeCollection(
+ StringTypeWithCollation(supportsTrimCollation = true),
+ BinaryType
+ ) +:
+ TypeCollection(
+ StringTypeWithCollation(supportsTrimCollation = true),
+ BinaryType,
+ NullType
+ ) +:
+ orderExpressions.map(_ => AnyDataType)
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val matchInputTypes = super.checkInputDataTypes()
+ if (matchInputTypes.isFailure) {
+ matchInputTypes
+ } else if (!delimiter.foldable) {
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> toSQLId("delimiter"),
+ "inputType" -> toSQLType(delimiter.dataType),
+ "inputExpr" -> toSQLExpr(delimiter)
+ )
+ )
+ } else if (delimiter.dataType == NullType) {
+ // null is the default empty delimiter so type is not important
+ TypeCheckSuccess
+ } else {
+ TypeUtils.checkForSameTypeInputExpr(child.dataType :: delimiter.dataType
:: Nil, prettyName)
+ }
+ }
+
+ override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
+ if (buffer.nonEmpty) {
+ val sortedBufferWithoutNulls = sortBuffer(buffer)
+ concatSkippingNulls(sortedBufferWithoutNulls)
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Sort buffer according orderExpressions.
+ * If orderExpressions is empty then returns buffer as is.
+ * The format of buffer is determined by [[needSaveOrderValue]]
+ * @return sorted buffer containing only child's values
+ */
+ private[this] def sortBuffer(buffer: mutable.ArrayBuffer[Any]):
mutable.ArrayBuffer[Any] = {
+ if (!orderingFilled) {
+ // without order return as is.
+ return buffer
+ }
+ if (!needSaveOrderValue) {
+ // Here the buffer has structure [childValue0, childValue1, ...]
+ // and we want to sort it by childValues
+ val sortOrderExpression = orderExpressions.head
+ val ascendingOrdering =
PhysicalDataType.ordering(sortOrderExpression.dataType)
+ val ordering =
+ if (sortOrderExpression.direction == Ascending) ascendingOrdering
+ else ascendingOrdering.reverse
+ buffer.sorted(ordering)
+ } else {
+ // Here the buffer has structure
+ // [[childValue, orderValue0, orderValue1, ...],
+ // [childValue, orderValue0, orderValue1, ...],
+ // ...]
+ // and we want to sort it by tuples (orderValue0, orderValue1, ...)
+ buffer
+ .asInstanceOf[mutable.ArrayBuffer[InternalRow]]
+ .sorted(bufferOrdering)
+ // drop orderValues after sort
+ .map(_.get(0, child.dataType))
+ }
+ }
+
+ /**
+ * @return ordering by (orderValue0, orderValue1, ...)
+ * for InternalRow with format [childValue, orderValue0,
orderValue1, ...]
+ */
+ private[this] def bufferOrdering: Ordering[InternalRow] = {
+ val bufferSortOrder = orderExpressions.zipWithIndex.map {
+ case (originalOrder, i) =>
+ originalOrder.copy(
+ // first value is the evaluated child so add +1 for order's values
+ child = BoundReference(i + 1, originalOrder.dataType,
originalOrder.child.nullable)
+ )
+ }
+ new InterpretedOrdering(bufferSortOrder)
+ }
+
+ private[this] def concatSkippingNulls(buffer: mutable.ArrayBuffer[Any]): Any
= {
+ getDelimiterValue match {
+ case Right(delimiterValue: Array[Byte]) =>
+ val inputs = buffer.filter(_ != null).map(_.asInstanceOf[Array[Byte]])
+ ByteArray.concatWS(delimiterValue, inputs.toSeq: _*)
+ case Left(delimiterValue: UTF8String) =>
+ val inputs = buffer.filter(_ != null).map(_.asInstanceOf[UTF8String])
+ UTF8String.concatWs(delimiterValue, inputs.toSeq: _*)
+ }
+ }
+
+ /**
+ * @return delimiter value or default empty value if delimiter is null. Type
respects [[dataType]]
+ */
+ private[this] def getDelimiterValue: Either[UTF8String, Array[Byte]] = {
+ val delimiterValue = delimiter.eval()
+ dataType match {
+ case _: StringType =>
+ Left(
+ if (delimiterValue == null) UTF8String.fromString("")
+ else delimiterValue.asInstanceOf[UTF8String]
+ )
+ case _: BinaryType =>
+ Right(
+ if (delimiterValue == null) ByteArray.EMPTY_BYTE
+ else delimiterValue.asInstanceOf[Array[Byte]]
+ )
+ }
+ }
+
+ override def dataType: DataType = child.dataType
+
+ override def update(buffer: ArrayBuffer[Any], input: InternalRow):
ArrayBuffer[Any] = {
+ val value = child.eval(input)
+ if (value != null) {
+ val v = if (!needSaveOrderValue) {
+ convertToBufferElement(value)
+ } else {
+ InternalRow.fromSeq(convertToBufferElement(value) +:
evalOrderValues(input))
+ }
+ buffer += v
+ }
+ buffer
+ }
+
+ private[this] def evalOrderValues(internalRow: InternalRow): Seq[Any] = {
+ orderExpressions.map(order =>
convertToBufferElement(order.child.eval(internalRow)))
+ }
+
+ override protected def convertToBufferElement(value: Any): Any =
InternalRow.copyValue(value)
+
+ override def children: Seq[Expression] = child +: delimiter +:
orderExpressions
+
+ /**
+ * Utility func to check if given order is defined and different from
[[child]].
+ *
+ * @see [[QueryCompilationErrors.functionAndOrderExpressionMismatchError]]
+ * @see [[needSaveOrderValue]]
+ */
+ private[this] def isOrderCompatible(someOrder: Seq[SortOrder]): Boolean = {
+ if (someOrder.isEmpty) {
+ return true
+ }
+ if (someOrder.size == 1 && someOrder.head.child.semanticEquals(child)) {
+ return true
+ }
+ false
+ }
+
+ override protected def withNewChildrenInternal(newChildren:
IndexedSeq[Expression]): Expression =
+ copy(
+ child = newChildren.head,
+ delimiter = newChildren(1),
+ orderExpressions = newChildren
+ .drop(2)
+ .map(_.asInstanceOf[SortOrder])
+ )
+
+ private[this] def orderValuesField: Seq[StructField] = {
+ orderExpressions.zipWithIndex.map {
+ case (order, i) => StructField(s"sortOrderValue[$i]", order.dataType)
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 89a6984b8085..6dfa1b499df2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -378,7 +378,7 @@ case class PercentileCont(left: Expression, right:
Expression, reverse: Boolean
override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]):
AggregateFunction = {
if (orderingWithinGroup.length != 1) {
- throw
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+ throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
@@ -390,6 +390,10 @@ case class PercentileCont(left: Expression, right:
Expression, reverse: Boolean
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): PercentileCont =
this.copy(left = newLeft, right = newRight)
+
+ override def orderingFilled: Boolean = left != UnresolvedWithinGroup
+ override def isOrderingMandatory: Boolean = true
+ override def isDistinctSupported: Boolean = false
}
/**
@@ -432,7 +436,7 @@ case class PercentileDisc(
override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]):
AggregateFunction = {
if (orderingWithinGroup.length != 1) {
- throw
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+ throw QueryCompilationErrors.wrongNumOrderingsForFunctionError(
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
@@ -467,6 +471,10 @@ case class PercentileDisc(
toDoubleValue(higherKey)
}
}
+
+ override def orderingFilled: Boolean = left != UnresolvedWithinGroup
+ override def isOrderingMandatory: Boolean = true
+ override def isDistinctSupported: Boolean = false
}
// scalastyle:off line.size.limit
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 03471ae8a3da..7d7a490c9790 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -27,7 +27,7 @@ import
org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentif
import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
FunctionAlreadyExistsException, NamespaceAlreadyExistsException,
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException,
NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression,
GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction,
WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression,
GroupingID, NamedExpression, SortOrder, SpecifiedWindowFrame, WindowFrame,
WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Assignment,
InputParameter, Join, LogicalPlan, SerdeInfo, Window}
@@ -725,28 +725,32 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"windowExpr" -> toSQLExpr(windowExpr)))
}
- def distinctInverseDistributionFunctionUnsupportedError(funcName: String):
Throwable = {
+ def distinctWithOrderingFunctionUnsupportedError(funcName: String):
Throwable = {
new AnalysisException(
- errorClass =
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
- messageParameters = Map("funcName" -> toSQLId(funcName)))
+ errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
+ messageParameters = Map("funcName" -> toSQLId(funcName))
+ )
}
- def inverseDistributionFunctionMissingWithinGroupError(funcName: String):
Throwable = {
+ def functionMissingWithinGroupError(funcName: String): Throwable = {
new AnalysisException(
- errorClass =
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
- messageParameters = Map("funcName" -> toSQLId(funcName)))
+ errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
+ messageParameters = Map("funcName" -> toSQLId(funcName))
+ )
}
- def wrongNumOrderingsForInverseDistributionFunctionError(
+ def wrongNumOrderingsForFunctionError(
funcName: String,
validOrderingsNumber: Int,
actualOrderingsNumber: Int): Throwable = {
new AnalysisException(
- errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+ errorClass = "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
messageParameters = Map(
"funcName" -> toSQLId(funcName),
"expectedNum" -> validOrderingsNumber.toString,
- "actualNum" -> actualOrderingsNumber.toString))
+ "actualNum" -> actualOrderingsNumber.toString
+ )
+ )
}
def aliasNumberNotMatchColumnNumberError(
@@ -1049,6 +1053,18 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"operation" -> operation))
}
+ def functionAndOrderExpressionMismatchError(
+ functionName: String,
+ functionArg: Expression,
+ orderExpr: Seq[SortOrder]): Throwable = {
+ new AnalysisException(
+ errorClass =
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ messageParameters = Map(
+ "funcName" -> toSQLId(functionName),
+ "funcArg" -> toSQLExpr(functionArg),
+ "orderingExpr" -> orderExpr.map(order =>
toSQLExpr(order.child)).mkString(", ")))
+ }
+
def wrongCommandForObjectTypeError(
operation: String,
requiredType: String,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c621c151c0bd..e77c050fe887 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -607,7 +607,12 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
// [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed
because those two distinct
// aggregates have different column expressions.
val distinctExpressions =
-
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
+ functionsWithDistinct.head.aggregateFunction.children
+ .filterNot(_.foldable)
+ .map {
+ case s: SortOrder => s.child
+ case e => e
+ }
val normalizedNamedDistinctExpressions = distinctExpressions.map {
e =>
// Ideally this should be done in `NormalizeFloatingNumbers`,
but we do it here
// because `distinctExpressions` is not extracted during logical
phase.
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index c54e09735a9b..39cefdaa892b 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -425,6 +425,8 @@
| org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis | kurtosis |
SELECT kurtosis(col) FROM VALUES (-10), (-20), (100), (1000) AS tab(col) |
struct<kurtosis(col):double> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last | SELECT
last(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last_value |
SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col) |
struct<last_value(col):int> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | listagg |
SELECT listagg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) |
struct<listagg(col, NULL):string> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.ListAgg | string_agg |
SELECT string_agg(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col) |
struct<string_agg(col, NULL):string> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Max | max | SELECT
max(col) FROM VALUES (10), (50), (20) AS tab(col) | struct<max(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy | max_by | SELECT
max_by(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y) |
struct<max_by(x, y):string> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Median | median | SELECT
median(col) FROM VALUES (0), (10) AS tab(col) | struct<median(col):double> |
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
new file mode 100644
index 000000000000..ca471858a541
--- /dev/null
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg-collations.sql.out
@@ -0,0 +1,86 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, utf8_binary) ASC NULLS FIRST, 0,
0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_binary) ASC
NULLS FIRST)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, utf8_lcase) ASC NULLS FIRST, 0,
0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC
NULLS FIRST)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_binary), null, 0, 0) AS
listagg(DISTINCT collate(c1, utf8_binary), NULL)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, 0, 0) AS
listagg(DISTINCT collate(c1, utf8_lcase), NULL)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, utf8_lcase), null, collate(c1#x,
utf8_lcase) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT collate(c1, utf8_lcase),
NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC NULLS FIRST)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '),
('abc '), ('x'), ('abc')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(distinct collate(c1#x, unicode_rtrim), null, 0, 0) AS
listagg(DISTINCT collate(c1, unicode_rtrim), NULL)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc
'), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, c1#x ASC NULLS FIRST, 0, 0) AS listagg(c1,
NULL) WITHIN GROUP (ORDER BY c1 ASC NULLS FIRST)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM
(VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, collate(c1#x, unicode_rtrim) ASC NULLS FIRST,
0, 0) AS listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, unicode_rtrim)
ASC NULLS FIRST)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"collate(c1, utf8_lcase)\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"collate(c1, utf8_binary)\""
+ }
+}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out
new file mode 100644
index 000000000000..71eb3f8ca76b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/listagg.sql.out
@@ -0,0 +1,435 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL,
NULL)) AS t(a, b)
+-- !query analysis
+CreateViewCommand `df`, SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b',
'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b), false, false, LocalTempView,
UNSUPPORTED, true
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query analysis
+CreateViewCommand `df2`, SELECT * FROM (VALUES (1, true), (2, false), (3,
false)) AS t(a, b), false, false, LocalTempView, UNSUPPORTED, true
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [string_agg(b#x, null, 0, 0) AS string_agg(b, NULL)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query analysis
+Aggregate [listagg(b#x, null, 0, 0) AS listagg(b, NULL)#x]
++- Filter NOT (1 = 1)
+ +- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query analysis
+Aggregate [a#x], [listagg(b#x, |, 0, 0) AS listagg(b, |)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, 0, 0) AS listagg(a, NULL)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query analysis
+Aggregate [listagg(distinct a#x, null, 0, 0) AS listagg(DISTINCT a, NULL)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, a#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL)
WITHIN GROUP (ORDER BY a ASC NULLS FIRST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL)
WITHIN GROUP (ORDER BY a DESC NULLS LAST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query analysis
+Project [listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x]
++- Project [a#x, b#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS
LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)#x, listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x]
+ +- Window [listagg(a#x, null, a#x DESC NULLS LAST, 0, 0)
windowspecdefinition(b#x, specifiedwindowframe(RowFrame, unboundedpreceding$(),
unboundedfollowing$())) AS listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS
LAST) OVER (PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)#x], [b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS
b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x ASC NULLS FIRST, 0, 0) AS listagg(a, NULL)
WITHIN GROUP (ORDER BY b ASC NULLS FIRST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, 0, 0) AS listagg(a, NULL)
WITHIN GROUP (ORDER BY b DESC NULLS LAST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, |, b#x DESC NULLS LAST, 0, 0) AS listagg(a, |) WITHIN
GROUP (ORDER BY b DESC NULLS LAST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x ASC NULLS FIRST, 0, 0)
AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a ASC NULLS
FIRST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query analysis
+Aggregate [listagg(a#x, null, b#x DESC NULLS LAST, a#x DESC NULLS LAST, 0, 0)
AS listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a DESC NULLS
LAST)#x]
++- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, null, 0, 0) AS listagg(c1, NULL)#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+Aggregate [listagg(c1#x, 0x42, 0, 0) AS listagg(c1, X'42')#x]
++- SubqueryAlias t
+ +- Project [col1#x AS c1#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query analysis
+Aggregate [listagg(cast(a#x as string), null, 0, 0) AS listagg(a, NULL)#x,
listagg(cast(b#x as string), ,, 0, 0) AS listagg(b, ,)#x]
++- SubqueryAlias df2
+ +- View (`df2`, [a#x, b#x])
+ +- Project [cast(a#x as int) AS a#x, cast(b#x as boolean) AS b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c1\"",
+ "inputType" : "\"ARRAY<STRING>\"",
+ "paramIndex" : "first",
+ "requiredType" : "(\"STRING\" or \"BINARY\")",
+ "sqlExpr" : "\"listagg(c1, NULL)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 18,
+ "fragment" : "listagg(c1)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "dataType" : "(\"BINARY\" or \"STRING\")",
+ "functionName" : "`listagg`",
+ "sqlExpr" : "\"listagg(c1, , )\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 24,
+ "fragment" : "listagg(c1, ', ')"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputExpr" : "\"a\"",
+ "inputName" : "`delimiter`",
+ "inputType" : "\"STRING\"",
+ "sqlExpr" : "\"listagg(b, a)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 20,
+ "fragment" : "listagg(b, a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query analysis
+Project [listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW)#x]
++- Project [a#x, listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x, listagg(a, NULL) OVER (ORDER BY
a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x]
+ +- Window [listagg(a#x, null, 0, 0) windowspecdefinition(a#x ASC NULLS
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$()))
AS listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW)#x], [a#x ASC NULLS FIRST]
+ +- Project [a#x]
+ +- SubqueryAlias df
+ +- View (`df`, [a#x, b#x])
+ +- Project [cast(a#x as string) AS a#x, cast(b#x as string) AS
b#x]
+ +- Project [a#x, b#x]
+ +- SubqueryAlias t
+ +- Project [col1#x AS a#x, col2#x AS b#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 61,
+ "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 64,
+ "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"a\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"b\""
+ }
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"a\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"a\", \"b\""
+ }
+}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
index d6ecbc72a717..8028c344140f 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
@@ -74,7 +74,7 @@ SELECT department, mode(DISTINCT salary) FROM basic_pays
GROUP BY department ORD
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -379,7 +379,7 @@ FROM basic_pays
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -401,7 +401,7 @@ FROM basic_pays
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -423,7 +423,7 @@ FROM basic_pays
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
"sqlState" : "42K0K",
"messageParameters" : {
"actualNum" : "1",
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
index 4a31cff8c7d0..eb8102afa47e 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
@@ -248,7 +248,7 @@ FROM aggr
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -270,7 +270,7 @@ FROM aggr
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -342,7 +342,7 @@ FROM aggr
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -364,7 +364,7 @@ FROM aggr
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -386,7 +386,7 @@ FROM aggr
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
"sqlState" : "42K0K",
"messageParameters" : {
"actualNum" : "2",
diff --git
a/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql
b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql
new file mode 100644
index 000000000000..35f86183c37b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/listagg-collations.sql
@@ -0,0 +1,12 @@
+-- Test cases with collations
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1);
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '),
('abc '), ('x'), ('abc')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc
'), ('abc\n'), ('abc'), ('x')) AS t(c1);
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM
(VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1);
+
+-- Error case with collations
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1);
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/inputs/listagg.sql
b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql
new file mode 100644
index 000000000000..15c8cfa823e9
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/listagg.sql
@@ -0,0 +1,38 @@
+-- Create temporary views
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL,
NULL)) AS t(a, b);
+
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b);
+
+-- Test cases for listagg function
+SELECT listagg(b) FROM df GROUP BY a;
+SELECT string_agg(b) FROM df GROUP BY a;
+SELECT listagg(b, NULL) FROM df GROUP BY a;
+SELECT listagg(b) FROM df WHERE 1 != 1;
+SELECT listagg(b, '|') FROM df GROUP BY a;
+SELECT listagg(a) FROM df;
+SELECT listagg(DISTINCT a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df;
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df;
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(a), listagg(b, ',') FROM df2;
+
+-- Error cases
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1);
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1);
+SELECT listagg(b, a) FROM df GROUP BY a;
+SELECT listagg(a) OVER (ORDER BY a) FROM df;
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df;
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df;
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df;
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df;
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df;
\ No newline at end of file
diff --git
a/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out
b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out
new file mode 100644
index 000000000000..cf3bac04f09c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/listagg-collations.sql.out
@@ -0,0 +1,82 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_binary) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_binary) ASC
NULLS FIRST):string>
+-- !query output
+ABab
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE utf8_lcase) FROM (VALUES
('a'), ('A'), ('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, utf8_lcase) ASC
NULLS FIRST):string>
+-- !query output
+aAbB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_binary) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_binary), NULL):string>
+-- !query output
+aAbB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) FROM (VALUES ('a'), ('A'),
('b'), ('B')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_lcase), NULL):string collate
UTF8_LCASE>
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_lcase) FROM (VALUES ('a'), ('B'), ('b'), ('A')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, utf8_lcase), NULL) WITHIN GROUP (ORDER BY
collate(c1, utf8_lcase) ASC NULLS FIRST):string collate UTF8_LCASE>
+-- !query output
+aB
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE unicode_rtrim) FROM (VALUES ('abc '),
('abc '), ('x'), ('abc')) AS t(c1)
+-- !query schema
+struct<listagg(DISTINCT collate(c1, unicode_rtrim), NULL):string collate
UNICODE_RTRIM>
+-- !query output
+abc x
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1) FROM (VALUES ('abc '), ('abc
'), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY c1 ASC NULLS FIRST):string>
+-- !query output
+abcabc
+abc abc x
+
+
+-- !query
+SELECT listagg(c1) WITHIN GROUP (ORDER BY c1 COLLATE unicode_rtrim) FROM
(VALUES ('abc '), ('abc '), ('abc\n'), ('abc'), ('x')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL) WITHIN GROUP (ORDER BY collate(c1, unicode_rtrim) ASC
NULLS FIRST):string>
+-- !query output
+abc abc abcabc
+x
+
+
+-- !query
+SELECT listagg(DISTINCT c1 COLLATE utf8_lcase) WITHIN GROUP (ORDER BY c1
COLLATE utf8_binary) FROM (VALUES ('a'), ('b'), ('A'), ('B')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"collate(c1, utf8_lcase)\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"collate(c1, utf8_binary)\""
+ }
+}
diff --git a/sql/core/src/test/resources/sql-tests/results/listagg.sql.out
b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out
new file mode 100644
index 000000000000..ef580704992c
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/listagg.sql.out
@@ -0,0 +1,368 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL,
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct<string_agg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct<listagg(b, NULL):string>
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct<listagg(b, |):string>
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct<listagg(a, NULL):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct<listagg(DISTINCT a, NULL):string>
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a ASC NULLS FIRST):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST):string>
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY a DESC NULLS LAST) OVER
(PARTITION BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING):string>
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b ASC NULLS FIRST):string>
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST):string>
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct<listagg(a, |) WITHIN GROUP (ORDER BY b DESC NULLS LAST):string>
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a ASC NULLS
FIRST):string>
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct<listagg(a, NULL) WITHIN GROUP (ORDER BY b DESC NULLS LAST, a DESC NULLS
LAST):string>
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL):binary>
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, NULL):binary>
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<listagg(c1, X'42'):binary>
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct<listagg(a, NULL):string,listagg(b, ,):string>
+-- !query output
+123 true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY('a', 'b'))) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"c1\"",
+ "inputType" : "\"ARRAY<STRING>\"",
+ "paramIndex" : "first",
+ "requiredType" : "(\"STRING\" or \"BINARY\")",
+ "sqlExpr" : "\"listagg(c1, NULL)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 18,
+ "fragment" : "listagg(c1)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "dataType" : "(\"BINARY\" or \"STRING\")",
+ "functionName" : "`listagg`",
+ "sqlExpr" : "\"listagg(c1, , )\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 24,
+ "fragment" : "listagg(c1, ', ')"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputExpr" : "\"a\"",
+ "inputName" : "`delimiter`",
+ "inputType" : "\"STRING\"",
+ "sqlExpr" : "\"listagg(b, a)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 20,
+ "fragment" : "listagg(b, a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<listagg(a, NULL) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW):string>
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 61,
+ "fragment" : "listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "aggFunc" : "\"listagg(a, NULL, a ASC NULLS FIRST)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 64,
+ "fragment" : "string_agg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "windowExpr" : "\"listagg(DISTINCT a, NULL) OVER (ORDER BY a ASC NULLS
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 44,
+ "fragment" : "listagg(DISTINCT a) OVER (ORDER BY a)"
+ } ]
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"a\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"b\""
+ }
+}
+
+
+-- !query
+SELECT listagg(DISTINCT a) WITHIN GROUP (ORDER BY a, b) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" :
"INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT",
+ "sqlState" : "42K0K",
+ "messageParameters" : {
+ "funcArg" : "\"a\"",
+ "funcName" : "`listagg`",
+ "orderingExpr" : "\"a\", \"b\""
+ }
+}
diff --git a/sql/core/src/test/resources/sql-tests/results/mode.sql.out
b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
index ad7d59eeb163..70f253066d4f 100644
--- a/sql/core/src/test/resources/sql-tests/results/mode.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
@@ -51,7 +51,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -373,7 +373,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -397,7 +397,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`mode`"
@@ -421,7 +421,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
"sqlState" : "42K0K",
"messageParameters" : {
"actualNum" : "1",
diff --git a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
index cd95eee186e1..55aaa8ee7378 100644
--- a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
@@ -222,7 +222,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -246,7 +246,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.DISTINCT_UNSUPPORTED",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -324,7 +324,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -348,7 +348,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WITHIN_GROUP_MISSING",
"sqlState" : "42K0K",
"messageParameters" : {
"funcName" : "`percentile_cont`"
@@ -372,7 +372,7 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+ "errorClass" : "INVALID_WITHIN_GROUP_EXPRESSION.WRONG_NUM_ORDERINGS",
"sqlState" : "42K0K",
"messageParameters" : {
"actualNum" : "2",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6348e5f31539..ad80dc65926b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -621,6 +621,41 @@ class DataFrameAggregateSuite extends QueryTest
)
}
+ test("listagg function") {
+ // normal case
+ val df = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+ checkAnswer(
+ df.selectExpr("listagg(a)", "listagg(b)"),
+ Seq(Row("abc", "bcd"))
+ )
+ checkAnswer(
+ df.select(listagg($"a"), listagg($"b")),
+ Seq(Row("abc", "bcd"))
+ )
+
+ // distinct case
+ val df2 = Seq(("a", "b"), ("a", "b"), ("b", "d")).toDF("a", "b")
+ checkAnswer(
+ df2.select(listagg_distinct($"a"), listagg_distinct($"b")),
+ Seq(Row("ab", "bd"))
+ )
+
+ // null case
+ val df3 = Seq(("a", "b", null), ("a", "b", null), (null, null,
null)).toDF("a", "b", "c")
+ checkAnswer(
+ df3.select(listagg_distinct($"a"), listagg($"a"),
listagg_distinct($"b"), listagg($"b"),
+ listagg($"c")),
+ Seq(Row("a", "aa", "b", "bb", null))
+ )
+
+ // custom delimiter
+ val df4 = Seq(("a", "b"), ("b", "c"), ("c", "d")).toDF("a", "b")
+ checkAnswer(
+ df4.selectExpr("listagg(a, '|')", "listagg(b, '|')"),
+ Seq(Row("a|b|c", "b|c|d"))
+ )
+ }
+
test("SPARK-31500: collect_set() of BinaryType returns duplicate elements") {
val bytesTest1 = "test1".getBytes
val bytesTest2 = "test2".getBytes
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 975a82e26f4e..4494057b1eef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -73,7 +73,9 @@ class DataFrameFunctionsSuite extends QueryTest with
SharedSparkSession {
"sum_distinct", // equivalent to sum(distinct foo)
"typedLit", "typedlit", // Scala only
"udaf", "udf", // create function statement in sql
- "call_function" // moot in SQL as you just call the function directly
+ "call_function", // moot in SQL as you just call the function directly
+ "listagg_distinct", // equivalent to listagg(distinct foo)
+ "string_agg_distinct" // equivalent to string_agg(distinct foo)
)
val excludedSqlFunctions = Set.empty[String]
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
index 662f43fc0039..283454ad273e 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
@@ -104,6 +104,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite
with SharedThriftServ
"timestampNTZ/datetime-special-ansi.sql",
// SPARK-47264
"collations.sql",
+ "listagg-collations.sql",
"pipe-operators.sql",
// VARIANT type
"variant/named-function-arguments.sql"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]