This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0f27d73b6c8b [SPARK-49565][SQL] Add SQL pipe syntax for the FROM operator 0f27d73b6c8b is described below commit 0f27d73b6c8b4139220fd0da33d5e8f0973283be Author: Jiashen Cao <jiash...@gatech.edu> AuthorDate: Tue Dec 10 20:03:04 2024 -0800 [SPARK-49565][SQL] Add SQL pipe syntax for the FROM operator ### What changes were proposed in this pull request? This PR adds SQL pipe syntax support for the FROM operator. Note: this PR includes all content in https://github.com/apache/spark/pull/48724 and also fixes remaining tests. For example: ``` CREATE TABLE t(x INT, y STRING) USING CSV; INSERT INTO t VALUES (0, 'abc'), (1, 'def'); CREATE TABLE other(a INT, b INT) USING JSON; INSERT INTO other VALUES (1, 1), (1, 2), (2, 4); FROM t |> SELECT 1 AS X; ``` ### Why are the changes needed? This allows users to use FROM in the pipe syntax format. ### Does this PR introduce any user-facing change? Yes, as indicated in the example. ### How was this patch tested? * Unit tests in `SparkSqlParserSuite` and `PlanParserSuite`. * Golden file based end to end tests in `pipe-operators.sql`. Was this patch authored or co-authored using generative AI tooling? No Closes #49120 from dtenedor/pipe-syntax-from-clause. Lead-authored-by: Jiashen Cao <jiash...@gatech.edu> Co-authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 5 +- .../sql/catalyst/parser/PlanParserSuite.scala | 16 +-- .../analyzer-results/pipe-operators.sql.out | 146 +++++++++++++++++++-- .../resources/sql-tests/inputs/pipe-operators.sql | 67 ++++++++-- .../sql-tests/results/pipe-operators.sql.out | 143 ++++++++++++++++++-- .../spark/sql/execution/SparkSqlParserSuite.scala | 3 + 7 files changed, 339 insertions(+), 43 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index a0f447dba798..8ef7ab90c6ff 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -648,7 +648,7 @@ sortItem ; fromStatement - : fromClause fromStatementBody+ + : fromClause fromStatementBody* ; fromStatementBody diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fad4fcefc1d1..47139810528d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -548,7 +548,10 @@ class AstBuilder extends DataTypeAstBuilder optionalMap(body.queryOrganization)(withQueryResultClauses(_, _, forPipeOperators = false)) } // If there are multiple SELECT just UNION them together into one query. - if (selects.length == 1) { + if (selects.length == 0) { + // This is a "FROM <tableName>" clause with no other syntax. + from + } else if (selects.length == 1) { selects.head } else { Union(selects.toSeq) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index c556a9237395..9e5555c4c6c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -323,19 +323,9 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "from db.a select b, c where d < 1", table("db", "a").where($"d" < 1).select($"b", $"c")) assertEqual("from a select distinct b, c", Distinct(table("a").select($"b", $"c"))) - - // Weird "FROM table" queries, should be invalid anyway - val sql1 = "from a" - checkError( - exception = parseException(sql1), - condition = "PARSE_SYNTAX_ERROR", - parameters = Map("error" -> "end of input", "hint" -> "")) - - val sql2 = "from (from a union all from b) c select *" - checkError( - exception = parseException(sql2), - condition = "PARSE_SYNTAX_ERROR", - parameters = Map("error" -> "'union'", "hint" -> "")) + assertEqual("from a", table("a")) + assertEqual("from (from a union all from b) c select *", + table("a").union(table("b")).subquery("c").select(star())) } test("multi select query") { diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 1e1ad90946f8..1121d8baf5db 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -265,6 +265,131 @@ CreateViewCommand `windowTestData`, select * from values +- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x] +-- !query +from t +-- !query analysis +SubqueryAlias spark_catalog.default.t ++- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +-- !query analysis +SubqueryAlias spark_catalog.default.t ++- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +from t +|> select 1 as x +-- !query analysis +Project [pipeexpression(1, false, SELECT) AS x#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +from t as t_alias +|> select t_alias.x +-- !query analysis +Project [x#x] ++- SubqueryAlias t_alias + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +from t as t_alias +|> select t_alias.x as tx, t_alias.y as ty +|> where ty = 'def' +|> select tx +-- !query analysis +Project [tx#x] ++- Filter (ty#x = def) + +- SubqueryAlias __auto_generated_subquery_name + +- Project [pipeexpression(x#x, false, SELECT) AS tx#x, pipeexpression(y#x, false, SELECT) AS ty#x] + +- SubqueryAlias t_alias + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +from t, other +|> select t.x + other.a as z +-- !query analysis +Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x] ++- Join Inner + :- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +from t join other on (t.x = other.a) +|> select t.x + other.a as z +-- !query analysis +Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x] ++- Join Inner, (x#x = a#x) + :- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +from t lateral view explode(array(100, 101)) as ly +|> select t.x + ly as z +-- !query analysis +Project [pipeexpression((x#x + ly#x), false, SELECT) AS z#x] ++- Generate explode(array(100, 101)), false, as, [ly#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +from st +|> select col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +from st as st_alias +|> select st_alias.col.i1 +-- !query analysis +Project [col#x.i1 AS i1#x] ++- SubqueryAlias st_alias + +- SubqueryAlias spark_catalog.default.st + +- Relation spark_catalog.default.st[x#x,col#x] parquet + + +-- !query +from values (0), (1) tab(col) +|> select col as x +-- !query analysis +Project [pipeexpression(col#x, false, SELECT) AS x#x] ++- SubqueryAlias tab + +- LocalRelation [col#x] + + +-- !query +from t +|> from t +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'from'", + "hint" : "" + } +} + + -- !query table t |> select 1 as x @@ -3661,13 +3786,12 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException -- !query with customer_total_return as - (table store_returns + (from store_returns |> join date_dim |> where sr_returned_date_sk = d_date_sk and d_year = 2000 |> aggregate sum(sr_return_amt) as ctr_total_return group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk) -table customer_total_return -|> as ctr1 +from customer_total_return ctr1 |> join store |> join customer |> where ctr1.ctr_total_return > @@ -3692,8 +3816,8 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 40, - "stopIndex" : 52, + "startIndex" : 39, + "stopIndex" : 51, "fragment" : "store_returns" } ] } @@ -4109,13 +4233,12 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException -- !query -table store_sales -|> as ss1 +from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk |> where rank_col > 0.9 * ( - table store_sales + from store_sales |> where ss_store_sk = 4 and ss_addr_sk is null |> aggregate avg(ss_net_profit) rank_col @@ -4130,8 +4253,7 @@ table store_sales |> where rnk < 11 |> as asceding |> join ( - table store_sales - |> as ss1 + from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk @@ -4170,8 +4292,8 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 7, - "stopIndex" : 17, + "startIndex" : 6, + "stopIndex" : 16, "fragment" : "store_sales" } ] } diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 924b42d9d305..1299da3020d5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -71,6 +71,60 @@ create temporary view windowTestData as select * from values (3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) AS testData(val, val_long, val_double, val_date, val_timestamp, cate); +-- FROM operators: positive tests. +---------------------------------- + +-- FromClause alone. +from t; + +-- Table alone. +table t; + +-- Selecting from a constant. +from t +|> select 1 as x; + +-- Selecting using a table alias. +from t as t_alias +|> select t_alias.x; + +-- Selecting using a table alias. +from t as t_alias +|> select t_alias.x as tx, t_alias.y as ty +|> where ty = 'def' +|> select tx; + +-- Selecting from multiple relations. +from t, other +|> select t.x + other.a as z; + +-- Selecting from multiple relations with join. +from t join other on (t.x = other.a) +|> select t.x + other.a as z; + +-- Selecting from lateral view. +from t lateral view explode(array(100, 101)) as ly +|> select t.x + ly as z; + +-- Selecting struct fields. +from st +|> select col.i1; + +-- Selecting struct fields using a table alias. +from st as st_alias +|> select st_alias.col.i1; + +-- Selecting from a VALUES list. +from values (0), (1) tab(col) +|> select col as x; + +-- FROM operators: negative tests. +---------------------------------- + +-- It is not possible to use the FROM operator accepting an input relation. +from t +|> from t; + -- SELECT operators: positive tests. --------------------------------------- @@ -1158,13 +1212,12 @@ order by c_customer_id limit 100; with customer_total_return as - (table store_returns + (from store_returns |> join date_dim |> where sr_returned_date_sk = d_date_sk and d_year = 2000 |> aggregate sum(sr_return_amt) as ctr_total_return group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk) -table customer_total_return -|> as ctr1 +from customer_total_return ctr1 |> join store |> join customer |> where ctr1.ctr_total_return > @@ -1466,13 +1519,12 @@ where asceding.rnk = descending.rnk order by asceding.rnk limit 100; -table store_sales -|> as ss1 +from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk |> where rank_col > 0.9 * ( - table store_sales + from store_sales |> where ss_store_sk = 4 and ss_addr_sk is null |> aggregate avg(ss_net_profit) rank_col @@ -1487,8 +1539,7 @@ table store_sales |> where rnk < 11 |> as asceding |> join ( - table store_sales - |> as ss1 + from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 570b61f388ea..cc603903712a 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -232,6 +232,136 @@ struct<> +-- !query +from t +-- !query schema +struct<x:int,y:string> +-- !query output +0 abc +1 def + + +-- !query +table t +-- !query schema +struct<x:int,y:string> +-- !query output +0 abc +1 def + + +-- !query +from t +|> select 1 as x +-- !query schema +struct<x:int> +-- !query output +1 +1 + + +-- !query +from t as t_alias +|> select t_alias.x +-- !query schema +struct<x:int> +-- !query output +0 +1 + + +-- !query +from t as t_alias +|> select t_alias.x as tx, t_alias.y as ty +|> where ty = 'def' +|> select tx +-- !query schema +struct<tx:int> +-- !query output +1 + + +-- !query +from t, other +|> select t.x + other.a as z +-- !query schema +struct<z:int> +-- !query output +1 +1 +2 +2 +2 +3 + + +-- !query +from t join other on (t.x = other.a) +|> select t.x + other.a as z +-- !query schema +struct<z:int> +-- !query output +2 +2 + + +-- !query +from t lateral view explode(array(100, 101)) as ly +|> select t.x + ly as z +-- !query schema +struct<z:int> +-- !query output +100 +101 +101 +102 + + +-- !query +from st +|> select col.i1 +-- !query schema +struct<i1:int> +-- !query output +2 + + +-- !query +from st as st_alias +|> select st_alias.col.i1 +-- !query schema +struct<i1:int> +-- !query output +2 + + +-- !query +from values (0), (1) tab(col) +|> select col as x +-- !query schema +struct<x:int> +-- !query output +0 +1 + + +-- !query +from t +|> from t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'from'", + "hint" : "" + } +} + + -- !query table t |> select 1 as x @@ -3344,13 +3474,12 @@ struct<c_customer_id:string> -- !query with customer_total_return as - (table store_returns + (from store_returns |> join date_dim |> where sr_returned_date_sk = d_date_sk and d_year = 2000 |> aggregate sum(sr_return_amt) as ctr_total_return group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk) -table customer_total_return -|> as ctr1 +from customer_total_return ctr1 |> join store |> join customer |> where ctr1.ctr_total_return > @@ -3696,13 +3825,12 @@ struct<rnk:int,best_performing:string,worst_performing:string> -- !query -table store_sales -|> as ss1 +from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk |> where rank_col > 0.9 * ( - table store_sales + from store_sales |> where ss_store_sk = 4 and ss_addr_sk is null |> aggregate avg(ss_net_profit) rank_col @@ -3717,8 +3845,7 @@ table store_sales |> where rnk < 11 |> as asceding |> join ( - table store_sales - |> as ss1 + from store_sales ss1 |> where ss_store_sk = 4 |> aggregate avg(ss_net_profit) rank_col group by ss_item_sk as item_sk diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 36a003883a77..e698c50e5631 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -901,6 +901,9 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y") checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y, X + 2 Z") checkPipeSelect("TABLE t |> EXTEND 1 AS z, 2 AS Z |> SET z = 1, Z = 2") + // FROM operators. + def checkPipeSelectFrom(query: String): Unit = check(query, Seq(PROJECT)) + checkPipeSelectFrom("FROM t |> SELECT 1 AS X") // Basic WHERE operators. def checkPipeWhere(query: String): Unit = check(query, Seq(FILTER)) checkPipeWhere("TABLE t |> WHERE X = 1") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org