This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ea222a3c72dd [SPARK-49566][SQL] Add SQL pipe syntax for the EXTEND
operator
ea222a3c72dd is described below
commit ea222a3c72dd9ec5461583e7445347b4a7f54194
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Nov 22 13:28:57 2024 +0800
[SPARK-49566][SQL] Add SQL pipe syntax for the EXTEND operator
### What changes were proposed in this pull request?
This PR adds SQL pipe syntax support for the EXTEND operator.
This operator preserves the existing input table and adds one or more new
computed columns whose values are equal to evaluating the specified
expressions. This is equivalent to `SELECT *, <newExpressions>` in the SQL
compiler. It is provided as a convenience feature and some functionality
overlap exists with lateral column aliases.
For example:
```
CREATE TABLE t(x INT, y STRING) USING CSV;
INSERT INTO t VALUES (0, 'abc'), (1, 'def');
TABLE t
|> EXTEND x + LENGTH(y) AS z;
+----+-----+-----+
| x | y | z |
+----+-----+-----+
| 0 | abc | 3 |
| 1 | def | 4 |
+----+-----+-----+
```
Like the `|> SELECT` operator, aggregate functions are not allowed in these
expressions. During the course of developing reasonable error messages for
this, I found that the SQL pipe syntax research paper also specified that the
`|> AGGREGATE` operator should require that each non-grouping expression
contains at least one aggregate function; I added a check and reasonable error
message for this case as well.
### Why are the changes needed?
The SQL pipe operator syntax will let users compose queries in a more
flexible fashion.
### Does this PR introduce _any_ user-facing change?
Yes, see above.
### How was this patch tested?
This PR adds a few unit test cases, but mostly relies on golden file test
coverage. I did this to make sure the answers are correct as this feature is
implemented and also so we can look at the analyzer output plans to ensure they
look right as well.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48854 from dtenedor/pipe-syntax-projections.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 10 +-
docs/sql-ref-ansi-compliance.md | 1 +
.../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 +
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 3 +
.../sql/catalyst/expressions/pipeOperators.scala | 48 ++--
.../spark/sql/catalyst/parser/AstBuilder.scala | 38 ++-
.../spark/sql/errors/QueryCompilationErrors.scala | 13 +-
.../analyzer-results/pipe-operators.sql.out | 283 +++++++++++++++++----
.../resources/sql-tests/inputs/pipe-operators.sql | 84 +++++-
.../sql-tests/results/keywords-enforced.sql.out | 1 +
.../resources/sql-tests/results/keywords.sql.out | 1 +
.../sql-tests/results/nonansi/keywords.sql.out | 1 +
.../sql-tests/results/pipe-operators.sql.out | 251 +++++++++++++++---
.../spark/sql/execution/SparkSqlParserSuite.scala | 4 +-
.../ThriftServerWithSparkContextSuite.scala | 2 +-
15 files changed, 618 insertions(+), 123 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 38b1656ac05c..94513cca1023 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3995,9 +3995,15 @@
],
"sqlState" : "42K03"
},
- "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : {
+ "PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION" : {
"message" : [
- "Aggregate function <expr> is not allowed when using the pipe operator
|> SELECT clause; please use the pipe operator |> AGGREGATE clause instead"
+ "Non-grouping expression <expr> is provided as an argument to the |>
AGGREGATE pipe operator but does not contain any aggregate function; please
update it to include an aggregate function and then retry the query again."
+ ],
+ "sqlState" : "0A000"
+ },
+ "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION" : {
+ "message" : [
+ "Aggregate function <expr> is not allowed when using the pipe operator
|> <clause> clause; please use the pipe operator |> AGGREGATE clause instead."
],
"sqlState" : "0A000"
},
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index e05c33509400..7af54850f5da 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -514,6 +514,7 @@ Below is a list of all the keywords in Spark SQL.
|EXISTS|non-reserved|non-reserved|reserved|
|EXPLAIN|non-reserved|non-reserved|non-reserved|
|EXPORT|non-reserved|non-reserved|non-reserved|
+|EXTEND|non-reserved|non-reserved|non-reserved|
|EXTENDED|non-reserved|non-reserved|non-reserved|
|EXTERNAL|non-reserved|non-reserved|reserved|
|EXTRACT|non-reserved|non-reserved|reserved|
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 085e723d02bc..eeebe89de8ff 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -228,6 +228,7 @@ EXCLUDE: 'EXCLUDE';
EXISTS: 'EXISTS';
EXPLAIN: 'EXPLAIN';
EXPORT: 'EXPORT';
+EXTEND: 'EXTEND';
EXTENDED: 'EXTENDED';
EXTERNAL: 'EXTERNAL';
EXTRACT: 'EXTRACT';
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 55a4b85ecb6b..cdee8c906054 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1503,6 +1503,7 @@ version
operatorPipeRightSide
: selectClause windowClause?
+ | EXTEND extendList=namedExpressionSeq
// Note that the WINDOW clause is not allowed in the WHERE pipe operator,
but we add it here in
// the grammar simply for purposes of catching this invalid syntax and
throwing a specific
// dedicated error message.
@@ -1617,6 +1618,7 @@ ansiNonReserved
| EXISTS
| EXPLAIN
| EXPORT
+ | EXTEND
| EXTENDED
| EXTERNAL
| EXTRACT
@@ -1963,6 +1965,7 @@ nonReserved
| EXISTS
| EXPLAIN
| EXPORT
+ | EXTEND
| EXTENDED
| EXTERNAL
| EXTRACT
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index a0f219821268..1b5ee5472913 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -18,41 +18,55 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT,
RUNTIME_REPLACEABLE, TreePattern}
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
- * Represents a SELECT clause when used with the |> SQL pipe operator.
- * We use this to make sure that no aggregate functions exist in the SELECT
expressions.
+ * Represents an expression when used with a SQL pipe operator.
+ * We use this to check invariants about whether aggregate functions may exist
in these expressions.
+ * @param child The child expression.
+ * @param isAggregate Whether the pipe operator is |> AGGREGATE.
+ * If true, the child expression must contain at least one
aggregate function.
+ * If false, the child expression must not contain any
aggregate functions.
+ * @param clause The clause of the pipe operator. This is used to generate
error messages.
*/
-case class PipeSelect(child: Expression)
+case class PipeExpression(child: Expression, isAggregate: Boolean, clause:
String)
extends UnaryExpression with RuntimeReplaceable {
- final override val nodePatterns: Seq[TreePattern] =
Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE)
- override def withNewChildInternal(newChild: Expression): Expression =
PipeSelect(newChild)
+ override def withNewChildInternal(newChild: Expression): Expression =
+ PipeExpression(newChild, isAggregate, clause)
override lazy val replacement: Expression = {
- def visit(e: Expression): Unit = e match {
- case a: AggregateFunction =>
- // If we used the pipe operator |> SELECT clause to specify an
aggregate function, this is
- // invalid; return an error message instructing the user to use the
pipe operator
- // |> AGGREGATE clause for this purpose instead.
- throw
QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a)
- case _: WindowExpression =>
- // Window functions are allowed in pipe SELECT operators, so do not
traverse into children.
- case _ =>
- e.children.foreach(visit)
+ val firstAggregateFunction: Option[AggregateFunction] =
findFirstAggregate(child)
+ if (isAggregate && firstAggregateFunction.isEmpty) {
+ throw
QueryCompilationErrors.pipeOperatorAggregateExpressionContainsNoAggregateFunction(child)
+ } else if (!isAggregate) {
+ firstAggregateFunction.foreach { a =>
+ throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a,
clause)
+ }
}
- visit(child)
child
}
+
+ /** Returns the first aggregate function in the given expression, or None if
not found. */
+ private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e
match {
+ case a: AggregateFunction =>
+ Some(a)
+ case _: WindowExpression =>
+ // Window functions are allowed in these pipe operators, so do not
traverse into children.
+ None
+ case _ =>
+ e.children.flatMap(findFirstAggregate).headOption
+ }
}
object PipeOperators {
// These are definitions of query result clauses that can be used with the
pipe operator.
+ val aggregateClause = "AGGREGATE"
val clusterByClause = "CLUSTER BY"
val distributeByClause = "DISTRIBUTE BY"
+ val extendClause = "EXTEND"
val limitClause = "LIMIT"
val offsetClause = "OFFSET"
val orderByClause = "ORDER BY"
+ val selectClause = "SELECT"
val sortByClause = "SORT BY"
val sortByDistributeByClause = "SORT BY ... DISTRIBUTE BY ..."
val windowClause = "WINDOW"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 1604a0b5aba1..08a8cf6bab87 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1236,7 +1236,7 @@ class AstBuilder extends DataTypeAstBuilder
* Add a regular (SELECT) query specification to a logical plan. The query
specification
* is the core of the logical plan, this is where sourcing (FROM clause),
projection (SELECT),
* aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
- * If 'isPipeOperatorSelect' is true, wraps each projected expression with a
[[PipeSelect]]
+ * If 'isPipeOperatorSelect' is true, wraps each projected expression with a
[[PipeExpression]]
* expression for future validation of the expressions during analysis.
*
* Note that query hints are ignored (both by the parser and the builder).
@@ -1293,11 +1293,12 @@ class AstBuilder extends DataTypeAstBuilder
def createProject() = if (namedExpressions.nonEmpty) {
val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
- // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]]
expression wrapping
+ // If this is a pipe operator |> SELECT clause, add a
[[PipeExpression]] wrapping
// each alias in the project list, so the analyzer can check
invariants later.
namedExpressions.map {
case a: Alias =>
- a.withNewChildren(Seq(PipeSelect(a.child)))
+ a.withNewChildren(Seq(
+ PipeExpression(a.child, isAggregate = false,
PipeOperators.selectClause)))
.asInstanceOf[NamedExpression]
case other =>
other
@@ -5933,6 +5934,24 @@ class AstBuilder extends DataTypeAstBuilder
windowClause = ctx.windowClause,
relation = left,
isPipeOperatorSelect = true)
+ }.getOrElse(Option(ctx.EXTEND).map { _ =>
+ // Visit each expression in the EXTEND operator, and add a
PipeExpression expression on top of
+ // it to generate clear error messages if the expression contains any
aggregate functions
+ // (this is not allowed in the EXTEND operator).
+ val extendExpressions: Seq[NamedExpression] =
+ Option(ctx.extendList).map { n: NamedExpressionSeqContext =>
+ visitNamedExpressionSeq(n).map {
+ case (a: Alias, _) =>
+ a.copy(
+ child = PipeExpression(a.child, isAggregate = false,
PipeOperators.extendClause))(
+ a.exprId, a.qualifier, a.explicitMetadata,
a.nonInheritableMetadataKeys)
+ case (e: Expression, aliasFunc) =>
+ UnresolvedAlias(
+ PipeExpression(e, isAggregate = false,
PipeOperators.extendClause), aliasFunc)
+ }
+ }.get
+ val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) ++
extendExpressions
+ Project(projectList, left)
}.getOrElse(Option(ctx.whereClause).map { c =>
if (ctx.windowClause() != null) {
throw
QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
@@ -5959,7 +5978,7 @@ class AstBuilder extends DataTypeAstBuilder
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
}.getOrElse(
visitOperatorPipeAggregate(ctx, left)
- ))))))))
+ )))))))))
}
private def visitOperatorPipeAggregate(
@@ -5970,11 +5989,18 @@ class AstBuilder extends DataTypeAstBuilder
"The AGGREGATE clause requires a list of aggregate expressions " +
"or a list of grouping expressions, or both", ctx)
}
+ // Visit each aggregate expression, and add a PipeAggregate expression on
top of it to generate
+ // clear error messages if the expression does not contain at least one
aggregate function.
val aggregateExpressions: Seq[NamedExpression] =
Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext =>
visitNamedExpressionSeq(n).map {
- case (e: NamedExpression, _) => e
- case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc)
+ case (a: Alias, _) =>
+ a.copy(child =
+ PipeExpression(a.child, isAggregate = true,
PipeOperators.aggregateClause))(
+ a.exprId, a.qualifier, a.explicitMetadata,
a.nonInheritableMetadataKeys)
+ case (e: Expression, aliasFunc) =>
+ UnresolvedAlias(
+ PipeExpression(e, isAggregate = true,
PipeOperators.aggregateClause), aliasFunc)
}
}.getOrElse(Seq.empty)
Option(ctx.aggregationClause()).map { c: AggregationClauseContext =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b628412929e3..03471ae8a3da 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4135,14 +4135,23 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
)
}
- def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable
= {
+ def pipeOperatorAggregateExpressionContainsNoAggregateFunction(expr:
Expression): Throwable = {
new AnalysisException(
- errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ errorClass =
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
messageParameters = Map(
"expr" -> expr.toString),
origin = expr.origin)
}
+ def pipeOperatorContainsAggregateFunction(expr: Expression, clause: String):
Throwable = {
+ new AnalysisException(
+ errorClass = "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
+ messageParameters = Map(
+ "expr" -> expr.toString,
+ "clause" -> clause),
+ origin = expr.origin)
+ }
+
def inlineTableContainsScalarSubquery(inlineTable: LogicalPlan): Throwable =
{
new AnalysisException(
errorClass =
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.SCALAR_SUBQUERY_IN_VALUES",
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 2e38ed137b41..bc5b642e3da0 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -269,7 +269,7 @@ CreateViewCommand `windowTestData`, select * from values
table t
|> select 1 as x
-- !query analysis
-Project [pipeselect(1) AS x#x]
+Project [pipeexpression(1, false, SELECT) AS x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -288,7 +288,7 @@ table t
|> select x, y
|> select x + length(y) as z
-- !query analysis
-Project [pipeselect((x#x + length(y#x))) AS z#x]
+Project [pipeexpression((x#x + length(y#x)), false, SELECT) AS z#x]
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -298,7 +298,7 @@ Project [pipeselect((x#x + length(y#x))) AS z#x]
values (0), (1) tab(col)
|> select col * 2 as result
-- !query analysis
-Project [pipeselect((col#x * 2)) AS result#x]
+Project [pipeexpression((col#x * 2), false, SELECT) AS result#x]
+- SubqueryAlias tab
+- LocalRelation [col#x]
@@ -307,7 +307,7 @@ Project [pipeselect((col#x * 2)) AS result#x]
(select * from t union all select * from t)
|> select x + length(y) as result
-- !query analysis
-Project [pipeselect((x#x + length(y#x))) AS result#x]
+Project [pipeexpression((x#x + length(y#x)), false, SELECT) AS result#x]
+- Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
@@ -358,7 +358,7 @@ Project [col#x.i1 AS i1#x]
table t
|> select (select a from other where x = a limit 1) as result
-- !query analysis
-Project [pipeselect(scalar-subquery#x [x#x]) AS result#x]
+Project [pipeexpression(scalar-subquery#x [x#x], false, SELECT) AS result#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [a#x]
@@ -383,7 +383,7 @@ Project [scalar-subquery#x [] AS result#x]
table t
|> select (select any_value(a) from other where x = a limit 1) as result
-- !query analysis
-Project [pipeselect(scalar-subquery#x [x#x]) AS result#x]
+Project [pipeexpression(scalar-subquery#x [x#x], false, SELECT) AS result#x]
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Aggregate [any_value(a#x, false) AS any_value(a)#x]
@@ -398,8 +398,8 @@ Project [pipeselect(scalar-subquery#x [x#x]) AS result#x]
table t
|> select x + length(x) as z, z + 1 as plus_one
-- !query analysis
-Project [z#x, pipeselect((z#x + 1)) AS plus_one#x]
-+- Project [x#x, y#x, pipeselect((x#x + length(cast(x#x as string)))) AS z#x]
+Project [z#x, pipeexpression((z#x + 1), false, SELECT) AS plus_one#x]
++- Project [x#x, y#x, pipeexpression((x#x + length(cast(x#x as string))),
false, SELECT) AS z#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -409,7 +409,7 @@ table t
|> select first_value(x) over (partition by y) as result
-- !query analysis
Project [result#x]
-+- Project [x#x, y#x, _we0#x, pipeselect(_we0#x) AS result#x]
++- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, SELECT) AS
result#x]
+- Window [first_value(x#x, false) windowspecdefinition(y#x,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS _we0#x], [y#x]
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
@@ -426,7 +426,7 @@ select 1 x, 2 y, 3 z
-- !query analysis
Project [a2#x]
+- Project [(1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)#x, x#x, a2#x]
- +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) +
_we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)#x, pipeselect(_we2#x) AS a2#x]
+ +- Project [x#x, y#x, _w1#x, z#x, _we0#xL, avg(y) OVER (ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#x, _we2#x, (cast(1 as bigint) +
_we0#xL) AS (1 + sum(x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING))#xL, avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)#x, pipeexpression(_we2#x, false, SELECT) AS a2#x]
+- Window [avg(_w1#x) windowspecdefinition(y#x, z#x ASC NULLS FIRST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
_we2#x], [y#x], [z#x ASC NULLS FIRST]
+- Window [sum(x#x)
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(),
unboundedfollowing$())) AS _we0#xL, avg(y#x)
windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(),
unboundedfollowing$())) AS avg(y) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING)#x]
+- Project [x#x, y#x, (x#x + 1) AS _w1#x, z#x]
@@ -513,9 +513,10 @@ table t
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
@@ -534,9 +535,10 @@ table t
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
@@ -549,6 +551,186 @@ org.apache.spark.sql.AnalysisException
}
+-- !query
+table t
+|> extend 1 as z
+-- !query analysis
+Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS z#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend 1
+-- !query analysis
+Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS pipeexpression(1)#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend x as z
+-- !query analysis
+Project [x#x, y#x, pipeexpression(x#x, false, EXTEND) AS z#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend x + length(y) as z
+-- !query analysis
+Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS z#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend x + length(y) as z, x + 1 as zz
+-- !query analysis
+Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS z#x,
pipeexpression((x#x + 1), false, EXTEND) AS zz#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend x + length(y) as z
+|> extend z + 1 as zz
+-- !query analysis
+Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, EXTEND) AS zz#x]
++- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS
z#x]
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+select col from st
+|> extend col.i1 as z
+-- !query analysis
+Project [col#x, pipeexpression(col#x.i1, false, EXTEND) AS z#x]
++- Project [col#x]
+ +- SubqueryAlias spark_catalog.default.st
+ +- Relation spark_catalog.default.st[x#x,col#x] parquet
+
+
+-- !query
+table t
+|> extend (select a from other where x = a limit 1) as z
+-- !query analysis
+Project [x#x, y#x, pipeexpression(scalar-subquery#x [x#x], false, EXTEND) AS
z#x]
+: +- GlobalLimit 1
+: +- LocalLimit 1
+: +- Project [a#x]
+: +- Filter (outer(x#x) = a#x)
+: +- SubqueryAlias spark_catalog.default.other
+: +- Relation spark_catalog.default.other[a#x,b#x] json
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> where exists (
+ table other
+ |> extend t.x
+ |> select * except (a, b))
+-- !query analysis
+Filter exists#x [x#x]
+: +- Project [pipeexpression(outer(spark_catalog.default.t.x))#x]
+: +- Project [a#x, b#x, pipeexpression(outer(x#x), false, EXTEND) AS
pipeexpression(outer(spark_catalog.default.t.x))#x]
+: +- SubqueryAlias spark_catalog.default.other
+: +- Relation spark_catalog.default.other[a#x,b#x] json
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend 1 as x
+-- !query analysis
+Project [x#x, y#x, pipeexpression(1, false, EXTEND) AS x#x]
++- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend first_value(x) over (partition by y) as result
+-- !query analysis
+Project [x#x, y#x, result#x]
++- Project [x#x, y#x, _we0#x, pipeexpression(_we0#x, false, EXTEND) AS
result#x]
+ +- Window [first_value(x#x, false) windowspecdefinition(y#x,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS _we0#x], [y#x]
+ +- Project [x#x, y#x]
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend x + length(y) as z, z + 1 as plus_one
+-- !query analysis
+Project [x#x, y#x, z#x, pipeexpression((z#x + 1), false, EXTEND) AS plus_one#x]
++- Project [x#x, y#x, pipeexpression((x#x + length(y#x)), false, EXTEND) AS
z#x]
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+|> extend sum(x) as z
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "clause" : "EXTEND",
+ "expr" : "sum(x#x)"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 19,
+ "stopIndex" : 24,
+ "fragment" : "sum(x)"
+ } ]
+}
+
+
+-- !query
+table t
+|> extend distinct x as z
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+ "errorClass" : "PARSE_SYNTAX_ERROR",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "error" : "'as'",
+ "hint" : ""
+ }
+}
+
+
+-- !query
+table t
+|> extend *
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "INVALID_USAGE_OF_STAR_OR_REGEX",
+ "sqlState" : "42000",
+ "messageParameters" : {
+ "elem" : "'*'",
+ "prettyName" : "expression `pipeexpression`"
+ }
+}
+
+
-- !query
table t
|> where true
@@ -822,7 +1004,7 @@ table courseSales
Project [c#x, __pivot_sum(e) AS s AS `sum(e) AS s`#x[0] AS firstYear_s#xL,
__pivot_avg(e) AS a AS `avg(e) AS a`#x[0] AS firstYear_a#x, __pivot_sum(e) AS s
AS `sum(e) AS s`#x[1] AS secondYear_s#xL, __pivot_avg(e) AS a AS `avg(e) AS
a`#x[1] AS secondYear_a#x]
+- Aggregate [c#x], [c#x, pivotfirst(y#x, sum(e) AS s#xL, 2012, 2013, 0, 0) AS
__pivot_sum(e) AS s AS `sum(e) AS s`#x, pivotfirst(y#x, avg(e) AS a#x, 2012,
2013, 0, 0) AS __pivot_avg(e) AS a AS `avg(e) AS a`#x]
+- Aggregate [c#x, y#x], [c#x, y#x, sum(e#x) AS sum(e) AS s#xL, avg(e#x) AS
avg(e) AS a#x]
- +- Project [pipeselect(year#x) AS y#x, pipeselect(course#x) AS c#x,
pipeselect(earnings#x) AS e#x]
+ +- Project [pipeexpression(year#x, false, SELECT) AS y#x,
pipeexpression(course#x, false, SELECT) AS c#x, pipeexpression(earnings#x,
false, SELECT) AS e#x]
+- SubqueryAlias coursesales
+- View (`courseSales`, [course#x, year#x, earnings#x])
+- Project [cast(course#x as string) AS course#x, cast(year#x
as int) AS year#x, cast(earnings#x as int) AS earnings#x]
@@ -2300,7 +2482,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
table other
|> aggregate sum(b) as result group by a
-- !query analysis
-Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
+Aggregate [a#x], [a#x, pipeexpression(sum(b#x), true, AGGREGATE) AS result#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json
@@ -2311,7 +2493,7 @@ table other
|> select result
-- !query analysis
Project [result#xL]
-+- Aggregate [a#x], [a#x, sum(b#x) AS result#xL]
++- Aggregate [a#x], [a#x, pipeexpression(sum(b#x), true, AGGREGATE) AS
result#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json
@@ -2322,7 +2504,7 @@ table other
|> select gkey
-- !query analysis
Project [gkey#x]
-+- Aggregate [(a#x + 1)], [(a#x + 1) AS gkey#x, sum(b#x) AS sum(b)#xL]
++- Aggregate [(a#x + 1)], [(a#x + 1) AS gkey#x, pipeexpression(sum(b#x), true,
AGGREGATE) AS pipeexpression(sum(b))#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json
@@ -2349,7 +2531,7 @@ Aggregate [1, 2], [1 AS 1#x, 2 AS 2#x]
table t
|> aggregate sum(x)
-- !query analysis
-Aggregate [sum(x#x) AS sum(x)#xL]
+Aggregate [pipeexpression(sum(x#x), true, AGGREGATE) AS
pipeexpression(sum(x))#xL]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -2358,7 +2540,7 @@ Aggregate [sum(x#x) AS sum(x)#xL]
table t
|> aggregate sum(x) + 1 as result_plus_one
-- !query analysis
-Aggregate [(sum(x#x) + cast(1 as bigint)) AS result_plus_one#xL]
+Aggregate [pipeexpression((sum(x#x) + cast(1 as bigint)), true, AGGREGATE) AS
result_plus_one#xL]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -2408,9 +2590,9 @@ select 1 x, 2 y, 3 z
|> aggregate avg(z) z group by x
|> aggregate count(distinct z) c
-- !query analysis
-Aggregate [count(distinct z#x) AS c#xL]
-+- Aggregate [x#x], [x#x, avg(z#xL) AS z#x]
- +- Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS z#xL]
+Aggregate [pipeexpression(count(distinct z#x), true, AGGREGATE) AS c#xL]
++- Aggregate [x#x], [x#x, pipeexpression(avg(z#xL), true, AGGREGATE) AS z#x]
+ +- Aggregate [x#x, y#x], [x#x, y#x, pipeexpression(sum(z#x), true,
AGGREGATE) AS z#xL]
+- Project [1 AS x#x, 2 AS y#x, 3 AS z#x]
+- OneRowRelation
@@ -2421,27 +2603,39 @@ select 1 x, 3 z
|> select x
-- !query analysis
Project [x#x]
-+- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, count(1) AS count(1)#xL]
++- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, pipeexpression(count(1), true,
AGGREGATE) AS pipeexpression(count(1))#xL]
+- Project [1 AS x#x, 3 AS z#x]
+- OneRowRelation
-- !query
table other
-|> aggregate a group by a
+|> aggregate a + count(b) group by a
-- !query analysis
-Aggregate [a#x], [a#x, a#x]
+Aggregate [a#x], [a#x, pipeexpression((cast(a#x as bigint) + count(b#x)),
true, AGGREGATE) AS pipeexpression((a + count(b)))#xL]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json
-- !query
table other
-|> aggregate a + count(b) group by a
+|> aggregate a group by a
-- !query analysis
-Aggregate [a#x], [a#x, (cast(a#x as bigint) + count(b#x)) AS (a + count(b))#xL]
-+- SubqueryAlias spark_catalog.default.other
- +- Relation spark_catalog.default.other[a#x,b#x] json
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" :
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "expr" : "a#x"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 37,
+ "stopIndex" : 37,
+ "fragment" : "a"
+ } ]
+}
-- !query
@@ -2596,16 +2790,19 @@ org.apache.spark.sql.catalyst.parser.ParseException
table other
|> aggregate a
-- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
+org.apache.spark.sql.AnalysisException
{
- "errorClass" : "MISSING_GROUP_BY",
- "sqlState" : "42803",
+ "errorClass" :
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "expr" : "a#x"
+ },
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
- "startIndex" : 1,
+ "startIndex" : 26,
"stopIndex" : 26,
- "fragment" : "table other\n|> aggregate a"
+ "fragment" : "a"
} ]
}
@@ -2616,9 +2813,10 @@ table other
-- !query analysis
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(a#x)"
},
"queryContext" : [ {
@@ -2733,21 +2931,6 @@ org.apache.spark.sql.AnalysisException
}
--- !query
-table other
-|> aggregate b group by a
--- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
- "errorClass" : "MISSING_AGGREGATION",
- "sqlState" : "42803",
- "messageParameters" : {
- "expression" : "\"b\"",
- "expressionAnyValue" : "\"any_value(b)\""
- }
-}
-
-
-- !query
table windowTestData
|> select cate, sum(val) over w
@@ -2808,7 +2991,7 @@ Project [cate#x, val#x, sum_val#xL, first_value(cate)
OVER (ORDER BY val ASC NUL
+- Window [first_value(cate#x, false) windowspecdefinition(val#x ASC NULLS
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$()))
AS first_value(cate) OVER (ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW)#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x, sum_val#xL]
+- Project [cate#x, val#x, sum_val#xL]
- +- Project [cate#x, val#x, _we0#xL, pipeselect(_we0#xL) AS
sum_val#xL]
+ +- Project [cate#x, val#x, _we0#xL, pipeexpression(_we0#xL, false,
SELECT) AS sum_val#xL]
+- Window [sum(val#x) windowspecdefinition(cate#x,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS _we0#xL], [cate#x]
+- Project [cate#x, val#x]
+- SubqueryAlias windowtestdata
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index 6261bc93b185..b9224db129ea 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -171,6 +171,76 @@ table t
table t
|> select y, length(y) + sum(x) as result;
+-- EXTEND operators: positive tests.
+------------------------------------
+
+-- Extending with a constant.
+table t
+|> extend 1 as z;
+
+-- Extending without an explicit alias.
+table t
+|> extend 1;
+
+-- Extending with an attribute.
+table t
+|> extend x as z;
+
+-- Extending with an expression.
+table t
+|> extend x + length(y) as z;
+
+-- Extending two times.
+table t
+|> extend x + length(y) as z, x + 1 as zz;
+
+-- Extending two times in sequence.
+table t
+|> extend x + length(y) as z
+|> extend z + 1 as zz;
+
+-- Extending with a struct field.
+select col from st
+|> extend col.i1 as z;
+
+-- Extending with a subquery.
+table t
+|> extend (select a from other where x = a limit 1) as z;
+
+-- Extending with a correlated reference.
+table t
+|> where exists (
+ table other
+ |> extend t.x
+ |> select * except (a, b));
+
+-- Extending with a column name that already exists in the input relation.
+table t
+|> extend 1 as x;
+
+-- Window functions are allowed in the pipe operator EXTEND list.
+table t
+|> extend first_value(x) over (partition by y) as result;
+
+-- Lateral column aliases in the pipe operator EXTEND list.
+table t
+|> extend x + length(y) as z, z + 1 as plus_one;
+
+-- EXTEND operators: negative tests.
+------------------------------------
+
+-- Aggregations are not allowed.
+table t
+|> extend sum(x) as z;
+
+-- DISTINCT is not supported.
+table t
+|> extend distinct x as z;
+
+-- EXTEND * is not supported.
+table t
+|> extend *;
+
-- WHERE operators: positive tests.
-----------------------------------
@@ -738,11 +808,6 @@ select 1 x, 3 z
|> aggregate count(*) group by x, z, x
|> select x;
--- Grouping expressions are allowed in the aggregate functions list if they
appear separately in the
--- GROUP BY clause.
-table other
-|> aggregate a group by a;
-
-- Aggregate expressions may contain a mix of aggregate functions and grouping
expressions.
table other
|> aggregate a + count(b) group by a;
@@ -750,6 +815,10 @@ table other
-- Aggregation operators: negative tests.
-----------------------------------------
+-- All aggregate expressions must contain at least one aggregate function.
+table other
+|> aggregate a group by a;
+
-- GROUP BY ALL is not currently supported.
select 3 as x, 4 as y
|> aggregate group by all;
@@ -815,11 +884,6 @@ select 1 x, 2 y, 3 z
|> where c = 1
|> where x = 1;
--- Aggregate expressions may not contain references to columns or expressions
not otherwise listed
--- in the GROUP BY clause.
-table other
-|> aggregate b group by a;
-
-- WINDOW operators (within SELECT): positive tests.
---------------------------------------------------
diff --git
a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
index b2331ec4ab80..7d96a3e98c83 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
@@ -115,6 +115,7 @@ EXECUTE true
EXISTS false
EXPLAIN false
EXPORT false
+EXTEND false
EXTENDED false
EXTERNAL false
EXTRACT false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index a88552502862..6cbfe519a76f 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -115,6 +115,7 @@ EXECUTE false
EXISTS false
EXPLAIN false
EXPORT false
+EXTEND false
EXTENDED false
EXTERNAL false
EXTRACT false
diff --git
a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
index a88552502862..6cbfe519a76f 100644
--- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
@@ -115,6 +115,7 @@ EXECUTE false
EXISTS false
EXPLAIN false
EXPORT false
+EXTEND false
EXTENDED false
EXTERNAL false
EXTRACT false
diff --git
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index a365e759b7c1..53aabce4d5ab 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -457,9 +457,10 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
@@ -480,9 +481,10 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(x#x)"
},
"queryContext" : [ {
@@ -495,6 +497,187 @@ org.apache.spark.sql.AnalysisException
}
+-- !query
+table t
+|> extend 1 as z
+-- !query schema
+struct<x:int,y:string,z:int>
+-- !query output
+0 abc 1
+1 def 1
+
+
+-- !query
+table t
+|> extend 1
+-- !query schema
+struct<x:int,y:string,pipeexpression(1):int>
+-- !query output
+0 abc 1
+1 def 1
+
+
+-- !query
+table t
+|> extend x as z
+-- !query schema
+struct<x:int,y:string,z:int>
+-- !query output
+0 abc 0
+1 def 1
+
+
+-- !query
+table t
+|> extend x + length(y) as z
+-- !query schema
+struct<x:int,y:string,z:int>
+-- !query output
+0 abc 3
+1 def 4
+
+
+-- !query
+table t
+|> extend x + length(y) as z, x + 1 as zz
+-- !query schema
+struct<x:int,y:string,z:int,zz:int>
+-- !query output
+0 abc 3 1
+1 def 4 2
+
+
+-- !query
+table t
+|> extend x + length(y) as z
+|> extend z + 1 as zz
+-- !query schema
+struct<x:int,y:string,z:int,zz:int>
+-- !query output
+0 abc 3 4
+1 def 4 5
+
+
+-- !query
+select col from st
+|> extend col.i1 as z
+-- !query schema
+struct<col:struct<i1:int,i2:int>,z:int>
+-- !query output
+{"i1":2,"i2":3} 2
+
+
+-- !query
+table t
+|> extend (select a from other where x = a limit 1) as z
+-- !query schema
+struct<x:int,y:string,z:int>
+-- !query output
+0 abc NULL
+1 def 1
+
+
+-- !query
+table t
+|> where exists (
+ table other
+ |> extend t.x
+ |> select * except (a, b))
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0 abc
+1 def
+
+
+-- !query
+table t
+|> extend 1 as x
+-- !query schema
+struct<x:int,y:string,x:int>
+-- !query output
+0 abc 1
+1 def 1
+
+
+-- !query
+table t
+|> extend first_value(x) over (partition by y) as result
+-- !query schema
+struct<x:int,y:string,result:int>
+-- !query output
+0 abc 0
+1 def 1
+
+
+-- !query
+table t
+|> extend x + length(y) as z, z + 1 as plus_one
+-- !query schema
+struct<x:int,y:string,z:int,plus_one:int>
+-- !query output
+0 abc 3 4
+1 def 4 5
+
+
+-- !query
+table t
+|> extend sum(x) as z
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "clause" : "EXTEND",
+ "expr" : "sum(x#x)"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 19,
+ "stopIndex" : 24,
+ "fragment" : "sum(x)"
+ } ]
+}
+
+
+-- !query
+table t
+|> extend distinct x as z
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+ "errorClass" : "PARSE_SYNTAX_ERROR",
+ "sqlState" : "42601",
+ "messageParameters" : {
+ "error" : "'as'",
+ "hint" : ""
+ }
+}
+
+
+-- !query
+table t
+|> extend *
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "INVALID_USAGE_OF_STAR_OR_REGEX",
+ "sqlState" : "42000",
+ "messageParameters" : {
+ "elem" : "'*'",
+ "prettyName" : "expression `pipeexpression`"
+ }
+}
+
+
-- !query
table t
|> where true
@@ -2005,7 +2188,7 @@ struct<1:int,2:int>
table t
|> aggregate sum(x)
-- !query schema
-struct<sum(x):bigint>
+struct<pipeexpression(sum(x)):bigint>
-- !query output
1
@@ -2079,22 +2262,35 @@ struct<x:int>
-- !query
table other
-|> aggregate a group by a
+|> aggregate a + count(b) group by a
-- !query schema
-struct<a:int,a:int>
+struct<a:int,pipeexpression((a + count(b))):bigint>
-- !query output
-1 1
-2 2
+1 3
+2 3
-- !query
table other
-|> aggregate a + count(b) group by a
+|> aggregate a group by a
-- !query schema
-struct<a:int,(a + count(b)):bigint>
+struct<>
-- !query output
-1 3
-2 3
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" :
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "expr" : "a#x"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 37,
+ "stopIndex" : 37,
+ "fragment" : "a"
+ } ]
+}
-- !query
@@ -2265,16 +2461,19 @@ table other
-- !query schema
struct<>
-- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
+org.apache.spark.sql.AnalysisException
{
- "errorClass" : "MISSING_GROUP_BY",
- "sqlState" : "42803",
+ "errorClass" :
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
+ "sqlState" : "0A000",
+ "messageParameters" : {
+ "expr" : "a#x"
+ },
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
- "startIndex" : 1,
+ "startIndex" : 26,
"stopIndex" : 26,
- "fragment" : "table other\n|> aggregate a"
+ "fragment" : "a"
} ]
}
@@ -2287,9 +2486,10 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
- "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
+ "errorClass" : "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
+ "clause" : "SELECT",
"expr" : "sum(a#x)"
},
"queryContext" : [ {
@@ -2414,23 +2614,6 @@ org.apache.spark.sql.AnalysisException
}
--- !query
-table other
-|> aggregate b group by a
--- !query schema
-struct<>
--- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
- "errorClass" : "MISSING_AGGREGATION",
- "sqlState" : "42803",
- "messageParameters" : {
- "expression" : "\"b\"",
- "expressionAnyValue" : "\"any_value(b)\""
- }
-}
-
-
-- !query
table windowTestData
|> select cate, sum(val) over w
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 357fd8beb961..03d6eb1a5020 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -891,13 +891,15 @@ class SparkSqlParserSuite extends AnalysisTest with
SharedSparkSession {
// inline table.
def check(query: String, patterns: Seq[TreePattern]): Unit = {
val plan: LogicalPlan = parser.parsePlan(query)
- assert(patterns.exists(plan.containsPattern))
+ assert(patterns.exists(plan.containsPattern), s"Failed to parse
$query, plan: $plan")
assert(plan.containsAnyPattern(UNRESOLVED_RELATION, LOCAL_RELATION))
}
def checkPipeSelect(query: String): Unit = check(query, Seq(PROJECT))
checkPipeSelect("TABLE t |> SELECT 1 AS X")
checkPipeSelect("TABLE t |> SELECT 1 AS X, 2 AS Y |> SELECT X + Y AS Z")
checkPipeSelect("VALUES (0), (1) tab(col) |> SELECT col * 2 AS result")
+ checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y")
+ checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y, X + 2 Z")
// Basic WHERE operators.
def checkPipeWhere(query: String): Unit = check(query, Seq(FILTER))
checkPipeWhere("TABLE t |> WHERE X = 1")
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 71d81b06463f..2acf25640ef7 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends
SharedThriftServer {
val sessionHandle = client.openSession(user, "")
val infoValue = client.getInfo(sessionHandle,
GetInfoType.CLI_ODBC_KEYWORDS)
// scalastyle:off line.size.limit
- assert(infoValue.getStringValue ==
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURR
[...]
+ assert(infoValue.getStringValue ==
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURR
[...]
// scalastyle:on line.size.limit
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]