This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 22735bedc497 [SPARK-50772][SQL] Retain table aliases after SET,
EXTEND, DROP operators
22735bedc497 is described below
commit 22735bedc497e620d3acc9da1b8fc1e70ad02d32
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Jan 9 13:10:43 2025 +0800
[SPARK-50772][SQL] Retain table aliases after SET, EXTEND, DROP operators
### What changes were proposed in this pull request?
Per initial feedback from testing, users would like table aliases (such as
those mapping to left and right side inputs to a prior join) to remain
available after SET, DROP, and EXTEND operators. Here is an example that should
work:
```
values (0), (1) lhs(a)
|> inner join values (1), (2) rhs(a) using (a)
|> extend lhs.a + rhs.a as z1
|> extend lhs.a - rhs.a as z2
|> drop z1
|> where z2 = 0
|> order by lhs.a, rhs.a, z2
|> set z2 = 4
|> limit 2
|> select lhs.a, rhs.a, z2;
1 1 4
```
To implement this:
* Previously, the `|> where` or `|> order by` operators added a
`SubqueryAlias` with an auto-generated table alias to the end of the logical
plan under construction, in order to prevent the analyzer from adding
attributes to the previous plan later (from
`ColumnResolutionHelper.resolveExprsAndAddMissingAttrs`).
* This PR replaces that behavior with a new `PipeOperator` instead, to
avoid replacing the table alias while maintaining correct behavior.
* This PR also updates docs to mention the improved table alias behavior.
### Why are the changes needed?
This makes SQL pipe syntax easier to use.
### Does this PR introduce _any_ user-facing change?
Yes, see above.
### How was this patch tested?
This PR adds and updates golden file based testing.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49420 from dtenedor/fix-table-aliases.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
docs/sql-pipe-syntax.md | 161 ++++++++++++++---
.../catalyst/analysis/ColumnResolutionHelper.scala | 7 +-
.../sql/catalyst/expressions/pipeOperators.scala | 22 +++
.../spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
.../spark/sql/catalyst/parser/AstBuilder.scala | 15 +-
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../spark/sql/catalyst/trees/TreePatterns.scala | 1 +
.../analyzer-results/pipe-operators.sql.out | 195 +++++++++++++--------
.../resources/sql-tests/inputs/pipe-operators.sql | 12 ++
.../sql-tests/results/pipe-operators.sql.out | 27 ++-
10 files changed, 327 insertions(+), 115 deletions(-)
diff --git a/docs/sql-pipe-syntax.md b/docs/sql-pipe-syntax.md
index 829aa0e607bd..3d757db96623 100644
--- a/docs/sql-pipe-syntax.md
+++ b/docs/sql-pipe-syntax.md
@@ -198,12 +198,22 @@ TABLE t;
Evaluates the provided expressions over each of the rows of the input table.
+In general, this operator is not always required with SQL pipe syntax. It is
possible to use it at
+or near the end of a query to evaluate expressions or specify a list of output
columns.
+
+Since the final query result always comprises the columns returned from the
last pipe operator,
+when this `SELECT` operator does not appear, the output includes all columns
from the full row.
+This behavior is similar to `SELECT *` in standard SQL syntax.
+
It is possible to use `DISTINCT` and `*` as needed.<br>
This works like the outermost `SELECT` in a table subquery in regular Spark
SQL.
Window functions are supported in the `SELECT` list as well. To use them, the
`OVER` clause must be
provided. You may provide the window specification in the `WINDOW` clause.
+Aggregate functions are not supported in this operator. To perform
aggregation, use the `AGGREGATE`
+operator instead.
+
For example:
```sql
@@ -226,7 +236,12 @@ FROM t
|> EXTEND <expr> [[AS] alias], ...
```
-Appends new columns to the input table by evaluating the specified expressions
over each of the input rows.
+Appends new columns to the input table by evaluating the specified expressions
over each of the
+input rows.
+
+After an `EXTEND` operation, top-level column names are updated but table
aliases still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs`
with a subsequent
+`EXTEND` and then `SELECT lhs.col, rhs.col`).
For example:
@@ -248,7 +263,17 @@ VALUES (0), (1) tab(col)
|> SET <column> = <expression>, ...
```
-Updates columns of the input table by replacing them with the result of
evaluating the provided expressions.
+Updates columns of the input table by replacing them with the result of
evaluating the provided
+expressions. Each such column reference must appear in the input table exactly
once.
+
+This is similar to `SELECT * EXCEPT (column), <expression> AS column` in
regular Spark SQL.
+
+It is possible to perform multiple assignments in a single `SET` clause. Each
assignment may refer
+to the result of previous assignments.
+
+After an assignment, top-level column names are updated but table aliases
still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs`
with a subsequent
+`SET` and then `SELECT lhs.col, rhs.col`).
For example:
@@ -256,6 +281,16 @@ For example:
VALUES (0), (1) tab(col)
|> SET col = col * 2;
++---+
+|col|
++---+
+| 0|
+| 2|
++---+
+
+VALUES (0), (1) tab(col)
+|> SET col = col * 2;
+
+---+
|col|
+---+
@@ -270,7 +305,14 @@ VALUES (0), (1) tab(col)
|> DROP <column>, ...
```
-Drops columns of the input table by name.
+Drops columns of the input table by name. Each such column reference must
appear in the input table
+exactly once.
+
+This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL.
+
+After a `DROP` operation, top-level column names are updated but table aliases
still refer to the
+original row values (such as an inner join between two tables `lhs` and `rhs`
with a subsequent
+`DROP` and then `SELECT lhs.col, rhs.col`).
For example:
@@ -293,18 +335,25 @@ VALUES (0, 1) tab(col1, col2)
Retains the same rows and column names of the input table but with a new table
alias.
+This operator is useful for introducing a new alias for the input table, which
can then be referred
+to in subsequent operators. Any existing alias for the table is replaced by
the new alias.
+
+It is useful to use this operator after adding new columns with `SELECT` or
`EXTEND` or after
+performing aggregation with `AGGREGATE`. This simplifies the process of
referring to the columns
+from subsequent `JOIN` operators and allows for more readable queries.
+
For example:
```sql
VALUES (0, 1) tab(col1, col2)
-|> AS new_tab;
-|> SELECT * FROM new_tab;
+|> AS new_tab
+|> SELECT col1 + col2 FROM new_tab;
-+----+----+
-|col1|col2|
-+----+----+
-| 0| 1|
-+----+----+
++-----------+
+|col1 + col2|
++-----------+
+| 1|
++-----------+
```
#### WHERE
@@ -357,22 +406,48 @@ VALUES (0), (0) tab(col)
#### AGGREGATE
```sql
+-- Full-table aggregation
|> AGGREGATE <agg_expr> [[AS] alias], ...
-```
-
-Performs full-table aggregation, returning one result row with a column for
each aggregate expression.
-```sql
+-- Aggregation with grouping
|> AGGREGATE [<agg_expr> [[AS] alias], ...] GROUP BY <grouping_expr> [AS
alias], ...
```
-Performs aggregation with grouping, returning one row per group. The column
list includes the
-grouping columns first and then the aggregate columns afterward. Aliases can
be assigned directly
-on grouping expressions.
+Performs aggregation across grouped rows or across the entire input table.
+
+If no `GROUP BY` clause is present, this performs full-table aggregation,
returning one result row
+with a column for each aggregate expression. Othwrise, this performs
aggregation with grouping,
+returning one row per group. Aliases can be assigned directly on grouping
expressions.
+
+The output column list of this operator includes the grouping columns first
(if any), and then the
+aggregate columns afterward.
+
+Each `<agg_expr>` expression can include standard aggregate function(s) like
`COUNT`, `SUM`, `AVG`,
+`MIN`, or any other aggregate function(s) that Spark SQL supports. Additional
expressions may appear
+below or above the aggregate function(s), such as `MIN(FLOOR(col)) + 1`. Each
`<agg_expr>`
+expression must contain at least one aggregate function (or otherwise the
query returns an error).
+Each `<agg_expr>` expression may include a column alias with `AS <alias>`, and
may also
+include a `DISTINCT` keyword to remove duplicate values before applying the
aggregate function (for
+example, `COUNT(DISTINCT col)`).
+
+If present, the `GROUP BY` clause can include any number of grouping
expressions, and each
+`<agg_expr>` expression will evaluate over each unique combination of values
of the grouping
+expressions. The output table contains the evaluated grouping expressions
followed by the evaluated
+aggregate functions. The `GROUP BY` expressions may include one-based
ordinals. Unlike regular SQL
+in which such ordinals refer to the expressions in the accompanying `SELECT`
clause, in SQL pipe
+syntax, they refer to the columns of the relation produced by the preceding
operator instead. For
+example, in `TABLE t |> AGGREGATE COUNT(*) GROUP BY 2`, we refer to the second
column of the input
+table `t`.
+
+There is no need to repeat entire expressions between `GROUP BY` and `SELECT`,
since the `AGGREGATE`
+operator automatically includes the evaluated grouping expressions in its
output. By the same token,
+after an `AGGREGATE` operator, it is often unnecessary to issue a following
`SELECT` operator, since
+`AGGREGATE` returns both the grouping columns and the aggregate columns in a
single step.
For example:
```sql
+-- Full-table aggregation
VALUES (0), (1) tab(col)
|> AGGREGATE COUNT(col) AS count;
@@ -382,6 +457,7 @@ VALUES (0), (1) tab(col)
| 2|
+-----+
+-- Aggregation with grouping
VALUES (0, 1), (0, 2) tab(col1, col2)
|> AGGREGATE COUNT(col2) AS count GROUP BY col1;
@@ -398,19 +474,45 @@ VALUES (0, 1), (0, 2) tab(col1, col2)
|> [LEFT | RIGHT | FULL | CROSS | SEMI | ANTI | NATURAL | LATERAL] JOIN
<table> [ON <condition> | USING(col, ...)]
```
-Joins rows from both inputs, returning a filtered cross-product of the pipe
input table and the table expression following the JOIN keyword.
+Joins rows from both inputs, returning a filtered cross-product of the pipe
input table and the
+table expression following the JOIN keyword. This behaves a similar manner as
the `JOIN` clause in
+regular SQL where the pipe operator input table becomes the left side of the
join and the table
+argument becomes the right side of the join.
+
+Standard join modifiers like `LEFT`, `RIGHT`, and `FULL` are supported before
the `JOIN` keyword.
+
+The join predicate may need to refer to columns from both inputs to the join.
In this case, it may
+be necessary to use table aliases to differentiate between columns in the
event that both inputs
+have columns with the same names. The `AS` operator can be useful here to
introduce a new alias for
+the pipe input table that becomes the left side of the join. Use standard
syntax to assign an alias
+to the table argument that becomes the right side of the join, if needed.
For example:
```sql
-VALUES (0, 1) tab(a, b)
-|> JOIN VALUES (0, 2) tab(c, d) ON a = c;
+SELECT 0 AS a, 1 AS b
+|> AS lhs
+|> JOIN VALUES (0, 2) rhs(a, b) ON (lhs.a = rhs.a);
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 0| 1| 0| 2|
+---+---+---+---+
+
+VALUES ('apples', 3), ('bananas', 4) t(item, sales)
+|> AS produce_sales
+|> LEFT JOIN
+ (SELECT "apples" AS item, 123 AS id) AS produce_data
+ USING (item)
+|> SELECT produce_sales.item, sales, id;
+
+/*---------+-------+------+
+ | item | sales | id |
+ +---------+-------+------+
+ | apples | 3 | 123 |
+ | bananas | 4 | NULL |
+ +---------+-------+------*/
```
#### ORDER BY
@@ -419,7 +521,8 @@ VALUES (0, 1) tab(a, b)
|> ORDER BY <expr> [ASC | DESC], ...
```
-Returns the input rows after sorting as indicated. Standard modifiers are
supported including NULLS FIRST/LAST.
+Returns the input rows after sorting as indicated. Standard modifiers are
supported including NULLS
+FIRST/LAST.
For example:
@@ -438,10 +541,10 @@ VALUES (0), (1) tab(col)
#### UNION, INTERSECT, EXCEPT
```sql
-|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>), (<query>), ...
+|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>)
```
-Performs the union or other set operation over the combined rows from the
input table plus one or more tables provided as input arguments.
+Performs the union or other set operation over the combined rows from the
input table or subquery.
For example:
@@ -469,12 +572,22 @@ For example:
```sql
VALUES (0), (0), (0), (0) tab(col)
-|> TABLESAMPLE BERNOULLI(1 ROWS);
+|> TABLESAMPLE (1 ROWS);
+
++---+
+|col|
++---+
+| 0|
++---+
+
+VALUES (0), (0) tab(col)
+|> TABLESAMPLE (100 PERCENT);
+---+
|col|
+---+
| 0|
+| 0|
+---+
```
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 36fd4d02f8da..56b2103c555d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -53,9 +53,10 @@ trait ColumnResolutionHelper extends Logging with
DataTypeErrorsBase {
(exprs, plan)
} else {
plan match {
- // For `Distinct` and `SubqueryAlias`, we can't recursively resolve
and add attributes
- // via its children.
- case u: UnaryNode if !u.isInstanceOf[Distinct] &&
!u.isInstanceOf[SubqueryAlias] =>
+ // For `Distinct` and `SubqueryAlias` and `PipeOperator`, we can't
recursively resolve and
+ // add attributes via its children.
+ case u: UnaryNode if !u.isInstanceOf[Distinct] &&
!u.isInstanceOf[SubqueryAlias]
+ && !u.isInstanceOf[PipeOperator] =>
val (newExprs, newChild) = {
// Resolving expressions against current plan.
val maybeResolvedExprs =
exprs.map(resolveExpressionByPlanOutput(_, u))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index fe8f0f264e85..40d7d24263a7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -18,6 +18,9 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR,
TreePattern}
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
@@ -57,6 +60,25 @@ case class PipeExpression(child: Expression, isAggregate:
Boolean, clause: Strin
}
}
+/**
+ * Represents the location within a logical plan that a SQL pipe operator
appeared.
+ * This acts as a logical boundary that works to prevent the analyzer from
modifying the logical
+ * operators above and below the boundary.
+ */
+case class PipeOperator(child: LogicalPlan) extends UnaryNode {
+ final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR)
+ override def output: Seq[Attribute] = child.output
+ override def withNewChildInternal(newChild: LogicalPlan): PipeOperator =
copy(child = newChild)
+}
+
+/** This rule removes all PipeOperator nodes from a logical plan at the end of
analysis. */
+object EliminatePipeOperators extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+ _.containsPattern(PIPE_OPERATOR), ruleId) {
+ case PipeOperator(child) => child
+ }
+}
+
object PipeOperators {
// These are definitions of query result clauses that can be used with the
pipe operator.
val aggregateClause = "AGGREGATE"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b141d2be04c3..c0c76dd44ad5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -313,6 +313,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
private val rules = Seq(
EliminateResolvedHint,
EliminateSubqueryAliases,
+ EliminatePipeOperators,
EliminateView,
ReplaceExpressions,
RewriteNonCorrelatedExists,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 1f9c14830364..f4f6d2b310f4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -6028,17 +6028,6 @@ class AstBuilder extends DataTypeAstBuilder
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
- // This helper function adds a table subquery boundary between the new
operator to be added
- // (such as a filter or sort) and the input plan if one does not already
exist. This helps the
- // analyzer behave as if we had added the corresponding SQL clause after a
table subquery
- // containing the input plan.
- def withSubqueryAlias(): LogicalPlan = left match {
- case _: SubqueryAlias | _: UnresolvedRelation | _: Join | _: Filter |
- _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort =>
- left
- case _ =>
- SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
- }
Option(ctx.selectClause).map { c =>
withSelectQuerySpecification(
ctx = ctx,
@@ -6082,7 +6071,7 @@ class AstBuilder extends DataTypeAstBuilder
if (ctx.windowClause() != null) {
throw
QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
}
- withWhereClause(c, withSubqueryAlias())
+ withWhereClause(c, PipeOperator(left))
}.getOrElse(Option(ctx.pivotClause()).map { c =>
if (ctx.unpivotClause() != null) {
throw
QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
@@ -6101,7 +6090,7 @@ class AstBuilder extends DataTypeAstBuilder
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.getOrElse(Option(ctx.queryOrganization).map { c =>
- withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
+ withQueryResultClauses(c, PipeOperator(left), forPipeOperators = true)
}.getOrElse(
visitOperatorPipeAggregate(ctx, left)
))))))))))))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index 0918306de62e..3f79e74b18a4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -108,6 +108,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences" ::
"org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn"
::
+ "org.apache.spark.sql.catalyst.expressions.EliminatePipeOperators" ::
// Catalyst Optimizer rules
"org.apache.spark.sql.catalyst.optimizer.BooleanSimplification" ::
"org.apache.spark.sql.catalyst.optimizer.CollapseProject" ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 80531da4a0ab..25ef341b8cef 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -79,6 +79,7 @@ object TreePattern extends Enumeration {
val OUTER_REFERENCE: Value = Value
val PARAMETER: Value = Value
val PARAMETERIZED_QUERY: Value = Value
+ val PIPE_OPERATOR: Value = Value
val PIVOT: Value = Value
val PLAN_EXPRESSION: Value = Value
val PYTHON_UDF: Value = Value
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 70de582fb7b2..ac74fea1dbfb 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -306,7 +306,7 @@ from t as t_alias
-- !query analysis
Project [tx#x]
+- Filter (ty#x = def)
- +- SubqueryAlias __auto_generated_subquery_name
+ +- PipeOperator
+- Project [pipeexpression(x#x, false, SELECT) AS tx#x,
pipeexpression(y#x, false, SELECT) AS ty#x]
+- SubqueryAlias t_alias
+- SubqueryAlias spark_catalog.default.t
@@ -769,8 +769,9 @@ Filter exists#x [x#x]
: +- Project [a#x, b#x, pipeexpression(outer(x#x), false, EXTEND) AS
pipeexpression(outer(spark_catalog.default.t.x))#x]
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1002,6 +1003,38 @@ Project [x#x, y#x, z#x]
+- Relation spark_catalog.default.t[x#x,y#x] csv
+-- !query
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2
+-- !query analysis
+Project [a#x, a#x, z2#x]
++- GlobalLimit 2
+ +- LocalLimit 2
+ +- PipeOperator
+ +- Project [a#x, pipeexpression(4, false, SET) AS z2#x, a#x]
+ +- Sort [a#x ASC NULLS FIRST, a#x ASC NULLS FIRST, z2#x ASC NULLS
FIRST], true
+ +- PipeOperator
+ +- Filter (z2#x = 0)
+ +- PipeOperator
+ +- Project [a#x, z2#x, a#x]
+ +- Project [a#x, z1#x, pipeexpression((a#x - a#x),
false, EXTEND) AS z2#x, a#x]
+ +- Project [a#x, pipeexpression((a#x + a#x),
false, EXTEND) AS z1#x, a#x]
+ +- Project [a#x, a#x, a#x]
+ +- Join Inner, (a#x = a#x)
+ :- SubqueryAlias lhs
+ : +- LocalRelation [a#x]
+ +- SubqueryAlias rhs
+ +- LocalRelation [a#x]
+
+
-- !query
table t
|> set z = 1
@@ -1241,9 +1274,10 @@ table t
|> where u.x = 1
-- !query analysis
Filter (x#x = 1)
-+- SubqueryAlias u
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias u
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1325,8 +1359,9 @@ table t
|> where true
-- !query analysis
Filter true
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1334,8 +1369,9 @@ table t
|> where x + length(y) < 4
-- !query analysis
Filter ((x#x + length(y#x)) < 4)
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1344,9 +1380,11 @@ table t
|> where x + length(y) < 3
-- !query analysis
Filter ((x#x + length(y#x)) < 3)
-+- Filter ((x#x + length(y#x)) < 4)
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- Filter ((x#x + length(y#x)) < 4)
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1354,7 +1392,7 @@ Filter ((x#x + length(y#x)) < 3)
|> where x = 1
-- !query analysis
Filter (x#x = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Aggregate [x#x], [x#x, sum(length(y#x)) AS sum_len#xL]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -1365,8 +1403,9 @@ table t
|> where t.x = 1
-- !query analysis
Filter (x#x = 1)
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1374,8 +1413,9 @@ table t
|> where spark_catalog.default.t.x = 1
-- !query analysis
Filter (x#x = 1)
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1383,7 +1423,7 @@ Filter (x#x = 1)
|> where col.i1 = 1
-- !query analysis
Filter (col#x.i1 = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Project [col#x]
+- SubqueryAlias spark_catalog.default.st
+- Relation spark_catalog.default.st[x#x,col#x] parquet
@@ -1394,8 +1434,9 @@ table st
|> where st.col.i1 = 2
-- !query analysis
Filter (col#x.i1 = 2)
-+- SubqueryAlias spark_catalog.default.st
- +- Relation spark_catalog.default.st[x#x,col#x] parquet
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.st
+ +- Relation spark_catalog.default.st[x#x,col#x] parquet
-- !query
@@ -1409,8 +1450,9 @@ Filter exists#x [x#x]
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1424,8 +1466,9 @@ Filter (scalar-subquery#x [x#x] = 1)
: +- Filter (outer(x#x) = a#x)
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -1527,7 +1570,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1551,7 +1594,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1575,7 +1618,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1599,7 +1642,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1621,7 +1664,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `sum_len`"
+ "proposal" : "`sum_len`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -2583,20 +2626,21 @@ table natural_join_test_t1
|> where k = "one"
-- !query analysis
Filter (k#x = one)
-+- Project [k#x, v1#x, v2#x]
- +- Join Inner, (k#x = k#x)
- :- SubqueryAlias natural_join_test_t1
- : +- View (`natural_join_test_t1`, [k#x, v1#x])
- : +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS v1#x]
- : +- Project [k#x, v1#x]
- : +- SubqueryAlias natural_join_test_t1
- : +- LocalRelation [k#x, v1#x]
- +- SubqueryAlias natural_join_test_t2
- +- View (`natural_join_test_t2`, [k#x, v2#x])
- +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS v2#x]
- +- Project [k#x, v2#x]
- +- SubqueryAlias natural_join_test_t2
- +- LocalRelation [k#x, v2#x]
++- PipeOperator
+ +- Project [k#x, v1#x, v2#x]
+ +- Join Inner, (k#x = k#x)
+ :- SubqueryAlias natural_join_test_t1
+ : +- View (`natural_join_test_t1`, [k#x, v1#x])
+ : +- Project [cast(k#x as string) AS k#x, cast(v1#x as int) AS
v1#x]
+ : +- Project [k#x, v1#x]
+ : +- SubqueryAlias natural_join_test_t1
+ : +- LocalRelation [k#x, v1#x]
+ +- SubqueryAlias natural_join_test_t2
+ +- View (`natural_join_test_t2`, [k#x, v2#x])
+ +- Project [cast(k#x as string) AS k#x, cast(v2#x as int) AS
v2#x]
+ +- Project [k#x, v2#x]
+ +- SubqueryAlias natural_join_test_t2
+ +- LocalRelation [k#x, v2#x]
-- !query
@@ -2774,7 +2818,7 @@ values (2, 'xyz') tab(x, y)
|> where x = 0
-- !query analysis
Filter (x#x = 0)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Distinct
+- Union false, false
:- SubqueryAlias tab
@@ -2932,8 +2976,9 @@ table t
|> order by x
-- !query analysis
Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -2941,7 +2986,7 @@ Sort [x#x ASC NULLS FIRST], true
|> order by x
-- !query analysis
Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv
@@ -2952,8 +2997,9 @@ values (0, 'abc') tab(x, y)
|> order by x
-- !query analysis
Sort [x#x ASC NULLS FIRST], true
-+- SubqueryAlias tab
- +- LocalRelation [x#x, y#x]
++- PipeOperator
+ +- SubqueryAlias tab
+ +- LocalRelation [x#x, y#x]
-- !query
@@ -2963,9 +3009,11 @@ table t
-- !query analysis
GlobalLimit 1
+- LocalLimit 1
- +- Sort [x#x ASC NULLS FIRST], true
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- Sort [x#x ASC NULLS FIRST], true
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -2977,11 +3025,12 @@ table t
GlobalLimit 2
+- LocalLimit 2
+- Offset 1
- +- SubqueryAlias __auto_generated_subquery_name
+ +- PipeOperator
+- Project [y#x]
+- Filter (x#x = 1)
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -2991,11 +3040,12 @@ table t
|> offset 1
-- !query analysis
Offset 1
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Project [y#x]
+- Filter (x#x = 1)
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3003,8 +3053,9 @@ table t
|> limit all offset 0
-- !query analysis
Offset 0
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3012,8 +3063,9 @@ table t
|> distribute by x
-- !query analysis
RepartitionByExpression [x#x]
-+- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
++- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3022,8 +3074,9 @@ table t
-- !query analysis
Sort [x#x ASC NULLS FIRST], false
+- RepartitionByExpression [x#x]
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3032,8 +3085,9 @@ table t
-- !query analysis
RepartitionByExpression [x#x]
+- Sort [x#x ASC NULLS FIRST], false
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3043,8 +3097,9 @@ order by y
-- !query analysis
Sort [y#x ASC NULLS FIRST], true
+- Sort [x#x DESC NULLS LAST], true
- +- SubqueryAlias spark_catalog.default.t
- +- Relation spark_catalog.default.t[x#x,y#x] csv
+ +- PipeOperator
+ +- SubqueryAlias spark_catalog.default.t
+ +- Relation spark_catalog.default.t[x#x,y#x] csv
-- !query
@@ -3312,7 +3367,7 @@ table other
|> where a = 1
-- !query analysis
Filter (a#x = 1)
-+- SubqueryAlias __auto_generated_subquery_name
++- PipeOperator
+- Aggregate [a#x], [a#x]
+- SubqueryAlias spark_catalog.default.other
+- Relation spark_catalog.default.other[a#x,b#x] json
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index ec4afc6b2372..0cae29d722a8 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -359,6 +359,18 @@ table t
|> extend 1 as z
|> set z = first_value(x) over (partition by y);
+-- Any prior table aliases remain visible after a SET operator.
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2;
+
-- SET operators: negative tests.
---------------------------------
diff --git
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 3dd212d889f9..0d5ec57b9e47 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -940,6 +940,23 @@ struct<x:int,y:string,z:int>
1 def 1
+-- !query
+values (0), (1) lhs(a)
+|> inner join values (1), (2) rhs(a) using (a)
+|> extend lhs.a + rhs.a as z1
+|> extend lhs.a - rhs.a as z2
+|> drop z1
+|> where z2 = 0
+|> order by lhs.a, rhs.a, z2
+|> set z2 = 4
+|> limit 2
+|> select lhs.a, rhs.a, z2
+-- !query schema
+struct<a:int,a:int,z2:int>
+-- !query output
+1 1 4
+
+
-- !query
table t
|> set z = 1
@@ -1481,7 +1498,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1507,7 +1524,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1533,7 +1550,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1559,7 +1576,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `z`"
+ "proposal" : "`z`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
@@ -1583,7 +1600,7 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"sqlState" : "42703",
"messageParameters" : {
"objectName" : "`y`",
- "proposal" : "`x`, `sum_len`"
+ "proposal" : "`sum_len`, `spark_catalog`.`default`.`t`.`x`"
},
"queryContext" : [ {
"objectType" : "",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]