This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f27d73b6c8b [SPARK-49565][SQL] Add SQL pipe syntax for the FROM 
operator
0f27d73b6c8b is described below

commit 0f27d73b6c8b4139220fd0da33d5e8f0973283be
Author: Jiashen Cao <jiash...@gatech.edu>
AuthorDate: Tue Dec 10 20:03:04 2024 -0800

    [SPARK-49565][SQL] Add SQL pipe syntax for the FROM operator
    
    ### What changes were proposed in this pull request?
    
    This PR adds SQL pipe syntax support for the FROM operator.
    
    Note: this PR includes all content in 
https://github.com/apache/spark/pull/48724 and also fixes remaining tests.
    
    For example:
    
    ```
    CREATE TABLE t(x INT, y STRING) USING CSV;
    INSERT INTO t VALUES (0, 'abc'), (1, 'def');
    
    CREATE TABLE other(a INT, b INT) USING JSON;
    INSERT INTO other VALUES (1, 1), (1, 2), (2, 4);
    
    FROM t
    |> SELECT 1 AS X;
    ```
    
    ### Why are the changes needed?
    
    This allows users to use FROM in the pipe syntax format.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, as indicated in the example.
    
    ### How was this patch tested?
    
    * Unit tests in `SparkSqlParserSuite` and `PlanParserSuite`.
    * Golden file based end to end tests in `pipe-operators.sql`.
    
    Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49120 from dtenedor/pipe-syntax-from-clause.
    
    Lead-authored-by: Jiashen Cao <jiash...@gatech.edu>
    Co-authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   5 +-
 .../sql/catalyst/parser/PlanParserSuite.scala      |  16 +--
 .../analyzer-results/pipe-operators.sql.out        | 146 +++++++++++++++++++--
 .../resources/sql-tests/inputs/pipe-operators.sql  |  67 ++++++++--
 .../sql-tests/results/pipe-operators.sql.out       | 143 ++++++++++++++++++--
 .../spark/sql/execution/SparkSqlParserSuite.scala  |   3 +
 7 files changed, 339 insertions(+), 43 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index a0f447dba798..8ef7ab90c6ff 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -648,7 +648,7 @@ sortItem
     ;
 
 fromStatement
-    : fromClause fromStatementBody+
+    : fromClause fromStatementBody*
     ;
 
 fromStatementBody
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index fad4fcefc1d1..47139810528d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -548,7 +548,10 @@ class AstBuilder extends DataTypeAstBuilder
         optionalMap(body.queryOrganization)(withQueryResultClauses(_, _, 
forPipeOperators = false))
     }
     // If there are multiple SELECT just UNION them together into one query.
-    if (selects.length == 1) {
+    if (selects.length == 0) {
+      // This is a "FROM <tableName>" clause with no other syntax.
+      from
+    } else if (selects.length == 1) {
       selects.head
     } else {
       Union(selects.toSeq)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index c556a9237395..9e5555c4c6c0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -323,19 +323,9 @@ class PlanParserSuite extends AnalysisTest {
     assertEqual(
       "from db.a select b, c where d < 1", table("db", "a").where($"d" < 
1).select($"b", $"c"))
     assertEqual("from a select distinct b, c", 
Distinct(table("a").select($"b", $"c")))
-
-    // Weird "FROM table" queries, should be invalid anyway
-    val sql1 = "from a"
-    checkError(
-      exception = parseException(sql1),
-      condition = "PARSE_SYNTAX_ERROR",
-      parameters = Map("error" -> "end of input", "hint" -> ""))
-
-    val sql2 = "from (from a union all from b) c select *"
-    checkError(
-      exception = parseException(sql2),
-      condition = "PARSE_SYNTAX_ERROR",
-      parameters = Map("error" -> "'union'", "hint" -> ""))
+    assertEqual("from a", table("a"))
+    assertEqual("from (from a union all from b) c select *",
+      table("a").union(table("b")).subquery("c").select(star()))
   }
 
   test("multi select query") {
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 1e1ad90946f8..1121d8baf5db 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -265,6 +265,131 @@ CreateViewCommand `windowTestData`, select * from values
          +- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, 
val_timestamp#x, cate#x]
 
 
+-- !query
+from t
+-- !query analysis
+SubqueryAlias spark_catalog.default.t
++- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+table t
+-- !query analysis
+SubqueryAlias spark_catalog.default.t
++- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+from t
+|> select 1 as x
+-- !query analysis
+Project [pipeexpression(1, false, SELECT) AS x#x]
++- SubqueryAlias spark_catalog.default.t
+   +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+from t as t_alias
+|> select t_alias.x
+-- !query analysis
+Project [x#x]
++- SubqueryAlias t_alias
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+from t as t_alias
+|> select t_alias.x as tx, t_alias.y as ty
+|> where ty = 'def'
+|> select tx
+-- !query analysis
+Project [tx#x]
++- Filter (ty#x = def)
+   +- SubqueryAlias __auto_generated_subquery_name
+      +- Project [pipeexpression(x#x, false, SELECT) AS tx#x, 
pipeexpression(y#x, false, SELECT) AS ty#x]
+         +- SubqueryAlias t_alias
+            +- SubqueryAlias spark_catalog.default.t
+               +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+from t, other
+|> select t.x + other.a as z
+-- !query analysis
+Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
++- Join Inner
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- SubqueryAlias spark_catalog.default.other
+      +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+from t join other on (t.x = other.a)
+|> select t.x + other.a as z
+-- !query analysis
+Project [pipeexpression((x#x + a#x), false, SELECT) AS z#x]
++- Join Inner, (x#x = a#x)
+   :- SubqueryAlias spark_catalog.default.t
+   :  +- Relation spark_catalog.default.t[x#x,y#x] csv
+   +- SubqueryAlias spark_catalog.default.other
+      +- Relation spark_catalog.default.other[a#x,b#x] json
+
+
+-- !query
+from t lateral view explode(array(100, 101)) as ly
+|> select t.x + ly as z
+-- !query analysis
+Project [pipeexpression((x#x + ly#x), false, SELECT) AS z#x]
++- Generate explode(array(100, 101)), false, as, [ly#x]
+   +- SubqueryAlias spark_catalog.default.t
+      +- Relation spark_catalog.default.t[x#x,y#x] csv
+
+
+-- !query
+from st
+|> select col.i1
+-- !query analysis
+Project [col#x.i1 AS i1#x]
++- SubqueryAlias spark_catalog.default.st
+   +- Relation spark_catalog.default.st[x#x,col#x] parquet
+
+
+-- !query
+from st as st_alias
+|> select st_alias.col.i1
+-- !query analysis
+Project [col#x.i1 AS i1#x]
++- SubqueryAlias st_alias
+   +- SubqueryAlias spark_catalog.default.st
+      +- Relation spark_catalog.default.st[x#x,col#x] parquet
+
+
+-- !query
+from values (0), (1) tab(col)
+|> select col as x
+-- !query analysis
+Project [pipeexpression(col#x, false, SELECT) AS x#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+from t
+|> from t
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'from'",
+    "hint" : ""
+  }
+}
+
+
 -- !query
 table t
 |> select 1 as x
@@ -3661,13 +3786,12 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 
 -- !query
 with customer_total_return as
-  (table store_returns
+  (from store_returns
   |> join date_dim
   |> where sr_returned_date_sk = d_date_sk and d_year = 2000
   |> aggregate sum(sr_return_amt) as ctr_total_return
        group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
-table customer_total_return
-|> as ctr1
+from customer_total_return ctr1
 |> join store
 |> join customer
 |> where ctr1.ctr_total_return >
@@ -3692,8 +3816,8 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
-    "startIndex" : 40,
-    "stopIndex" : 52,
+    "startIndex" : 39,
+    "stopIndex" : 51,
     "fragment" : "store_returns"
   } ]
 }
@@ -4109,13 +4233,12 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 
 
 -- !query
-table store_sales
-|> as ss1
+from store_sales ss1
 |> where ss_store_sk = 4
 |> aggregate avg(ss_net_profit) rank_col
      group by ss_item_sk as item_sk
 |> where rank_col > 0.9 * (
-     table store_sales
+     from store_sales
      |> where ss_store_sk = 4
           and ss_addr_sk is null
      |> aggregate avg(ss_net_profit) rank_col
@@ -4130,8 +4253,7 @@ table store_sales
 |> where rnk < 11
 |> as asceding
 |> join (
-     table store_sales
-     |> as ss1
+     from store_sales ss1
      |> where ss_store_sk = 4
      |> aggregate avg(ss_net_profit) rank_col
           group by ss_item_sk as item_sk
@@ -4170,8 +4292,8 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
-    "startIndex" : 7,
-    "stopIndex" : 17,
+    "startIndex" : 6,
+    "stopIndex" : 16,
     "fragment" : "store_sales"
   } ]
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
index 924b42d9d305..1299da3020d5 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
@@ -71,6 +71,60 @@ create temporary view windowTestData as select * from values
   (3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
   AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
 
+-- FROM operators: positive tests.
+----------------------------------
+
+-- FromClause alone.
+from t;
+
+-- Table alone.
+table t;
+
+-- Selecting from a constant.
+from t
+|> select 1 as x;
+
+-- Selecting using a table alias.
+from t as t_alias
+|> select t_alias.x;
+
+-- Selecting using a table alias.
+from t as t_alias
+|> select t_alias.x as tx, t_alias.y as ty
+|> where ty = 'def'
+|> select tx;
+
+-- Selecting from multiple relations.
+from t, other
+|> select t.x + other.a as z;
+
+-- Selecting from multiple relations with join.
+from t join other on (t.x = other.a)
+|> select t.x + other.a as z;
+
+-- Selecting from lateral view.
+from t lateral view explode(array(100, 101)) as ly
+|> select t.x + ly as z;
+
+-- Selecting struct fields.
+from st
+|> select col.i1;
+
+-- Selecting struct fields using a table alias.
+from st as st_alias
+|> select st_alias.col.i1;
+
+-- Selecting from a VALUES list.
+from values (0), (1) tab(col)
+|> select col as x;
+
+-- FROM operators: negative tests.
+----------------------------------
+
+-- It is not possible to use the FROM operator accepting an input relation.
+from t
+|> from t;
+
 -- SELECT operators: positive tests.
 ---------------------------------------
 
@@ -1158,13 +1212,12 @@ order by c_customer_id
 limit 100;
 
 with customer_total_return as
-  (table store_returns
+  (from store_returns
   |> join date_dim
   |> where sr_returned_date_sk = d_date_sk and d_year = 2000
   |> aggregate sum(sr_return_amt) as ctr_total_return
        group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
-table customer_total_return
-|> as ctr1
+from customer_total_return ctr1
 |> join store
 |> join customer
 |> where ctr1.ctr_total_return >
@@ -1466,13 +1519,12 @@ where asceding.rnk = descending.rnk
 order by asceding.rnk
 limit 100;
 
-table store_sales
-|> as ss1
+from store_sales ss1
 |> where ss_store_sk = 4
 |> aggregate avg(ss_net_profit) rank_col
      group by ss_item_sk as item_sk
 |> where rank_col > 0.9 * (
-     table store_sales
+     from store_sales
      |> where ss_store_sk = 4
           and ss_addr_sk is null
      |> aggregate avg(ss_net_profit) rank_col
@@ -1487,8 +1539,7 @@ table store_sales
 |> where rnk < 11
 |> as asceding
 |> join (
-     table store_sales
-     |> as ss1
+     from store_sales ss1
      |> where ss_store_sk = 4
      |> aggregate avg(ss_net_profit) rank_col
           group by ss_item_sk as item_sk
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 570b61f388ea..cc603903712a 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -232,6 +232,136 @@ struct<>
 
 
 
+-- !query
+from t
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+table t
+-- !query schema
+struct<x:int,y:string>
+-- !query output
+0      abc
+1      def
+
+
+-- !query
+from t
+|> select 1 as x
+-- !query schema
+struct<x:int>
+-- !query output
+1
+1
+
+
+-- !query
+from t as t_alias
+|> select t_alias.x
+-- !query schema
+struct<x:int>
+-- !query output
+0
+1
+
+
+-- !query
+from t as t_alias
+|> select t_alias.x as tx, t_alias.y as ty
+|> where ty = 'def'
+|> select tx
+-- !query schema
+struct<tx:int>
+-- !query output
+1
+
+
+-- !query
+from t, other
+|> select t.x + other.a as z
+-- !query schema
+struct<z:int>
+-- !query output
+1
+1
+2
+2
+2
+3
+
+
+-- !query
+from t join other on (t.x = other.a)
+|> select t.x + other.a as z
+-- !query schema
+struct<z:int>
+-- !query output
+2
+2
+
+
+-- !query
+from t lateral view explode(array(100, 101)) as ly
+|> select t.x + ly as z
+-- !query schema
+struct<z:int>
+-- !query output
+100
+101
+101
+102
+
+
+-- !query
+from st
+|> select col.i1
+-- !query schema
+struct<i1:int>
+-- !query output
+2
+
+
+-- !query
+from st as st_alias
+|> select st_alias.col.i1
+-- !query schema
+struct<i1:int>
+-- !query output
+2
+
+
+-- !query
+from values (0), (1) tab(col)
+|> select col as x
+-- !query schema
+struct<x:int>
+-- !query output
+0
+1
+
+
+-- !query
+from t
+|> from t
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'from'",
+    "hint" : ""
+  }
+}
+
+
 -- !query
 table t
 |> select 1 as x
@@ -3344,13 +3474,12 @@ struct<c_customer_id:string>
 
 -- !query
 with customer_total_return as
-  (table store_returns
+  (from store_returns
   |> join date_dim
   |> where sr_returned_date_sk = d_date_sk and d_year = 2000
   |> aggregate sum(sr_return_amt) as ctr_total_return
        group by sr_customer_sk as ctr_customer_sk, sr_store_sk as ctr_store_sk)
-table customer_total_return
-|> as ctr1
+from customer_total_return ctr1
 |> join store
 |> join customer
 |> where ctr1.ctr_total_return >
@@ -3696,13 +3825,12 @@ 
struct<rnk:int,best_performing:string,worst_performing:string>
 
 
 -- !query
-table store_sales
-|> as ss1
+from store_sales ss1
 |> where ss_store_sk = 4
 |> aggregate avg(ss_net_profit) rank_col
      group by ss_item_sk as item_sk
 |> where rank_col > 0.9 * (
-     table store_sales
+     from store_sales
      |> where ss_store_sk = 4
           and ss_addr_sk is null
      |> aggregate avg(ss_net_profit) rank_col
@@ -3717,8 +3845,7 @@ table store_sales
 |> where rnk < 11
 |> as asceding
 |> join (
-     table store_sales
-     |> as ss1
+     from store_sales ss1
      |> where ss_store_sk = 4
      |> aggregate avg(ss_net_profit) rank_col
           group by ss_item_sk as item_sk
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 36a003883a77..e698c50e5631 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -901,6 +901,9 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
       checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y")
       checkPipeSelect("TABLE t |> EXTEND X + 1 AS Y, X + 2 Z")
       checkPipeSelect("TABLE t |> EXTEND 1 AS z, 2 AS Z |> SET z = 1, Z = 2")
+      // FROM operators.
+      def checkPipeSelectFrom(query: String): Unit = check(query, Seq(PROJECT))
+      checkPipeSelectFrom("FROM t |> SELECT 1 AS X")
       // Basic WHERE operators.
       def checkPipeWhere(query: String): Unit = check(query, Seq(FILTER))
       checkPipeWhere("TABLE t |> WHERE X = 1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to