Repository: spark Updated Branches: refs/heads/master f405de87c -> 24bea0004
[SPARK-14954] [SQL] Add PARTITION BY and BUCKET BY clause for data source CTAS syntax Currently, we can only create persisted partitioned and/or bucketed data source tables using the Dataset API but not using SQL DDL. This PR implements the following syntax to add partitioning and bucketing support to the SQL DDL: ``` CREATE TABLE <table-name> USING <provider> [OPTIONS (<key1> <value1>, <key2> <value2>, ...)] [PARTITIONED BY (col1, col2, ...)] [CLUSTERED BY (col1, col2, ...) [SORTED BY (col1, col2, ...)] INTO <n> BUCKETS] AS SELECT ... ``` Test cases are added in `MetastoreDataSourcesSuite` to check the newly added syntax. Author: Cheng Lian <[email protected]> Author: Yin Huai <[email protected]> Closes #12734 from liancheng/spark-14954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24bea000 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24bea000 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24bea000 Branch: refs/heads/master Commit: 24bea000476cdd0b43be5160a76bc5b170ef0b42 Parents: f405de8 Author: Cheng Lian <[email protected]> Authored: Wed Apr 27 13:55:07 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Wed Apr 27 13:55:13 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../spark/sql/execution/SparkSqlParser.scala | 12 ++- .../sql/hive/MetastoreDataSourcesSuite.scala | 93 ++++++++++++++++++++ 3 files changed, 106 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 6e04f6e..c356f0c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -47,7 +47,9 @@ statement | createTableHeader ('(' colTypeList ')')? tableProvider (OPTIONS tablePropertyList)? #createTableUsing | createTableHeader tableProvider - (OPTIONS tablePropertyList)? AS? query #createTableUsing + (OPTIONS tablePropertyList)? + (PARTITIONED BY partitionColumnNames=identifierList)? + bucketSpec? AS? query #createTableUsing | createTableHeader ('(' columns=colTypeList ')')? (COMMENT STRING)? (PARTITIONED BY '(' partitionColumns=colTypeList ')')? http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 79fdf9f..e4c837a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -289,6 +289,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText + val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) if (ctx.query != null) { // Get the backing query. @@ -302,9 +303,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } else { SaveMode.ErrorIfExists } - CreateTableUsingAsSelect(table, provider, temp, Array.empty, None, mode, options, query) + + val partitionColumnNames = + Option(ctx.partitionColumnNames) + .map(visitIdentifierList(_).toArray) + .getOrElse(Array.empty[String]) + + CreateTableUsingAsSelect( + table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query) } else { - val struct = Option(ctx.colTypeList).map(createStructType) + val struct = Option(ctx.colTypeList()).map(createStructType) CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/24bea000/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b21ca4f..cb10002 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -940,4 +940,97 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) }) } } + + test("CTAS: persisted partitioned data source table") { + withTempDir { dir => + withTable("t") { + val path = dir.getCanonicalPath + + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS (PATH '$path') + |PARTITIONED BY (a) + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + + val metastoreTable = sharedState.externalCatalog.getTable("default", "t") + assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1) + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets")) + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols")) + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols")) + + checkAnswer(table("t"), Row(2, 1)) + } + } + } + + test("CTAS: persisted bucketed data source table") { + withTempDir { dir => + withTable("t") { + val path = dir.getCanonicalPath + + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS (PATH '$path') + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + + val metastoreTable = sharedState.externalCatalog.getTable("default", "t") + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols")) + assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) + assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) + assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1) + + checkAnswer(table("t"), Row(1, 2)) + } + + withTable("t") { + val path = dir.getCanonicalPath + + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS (PATH '$path') + |CLUSTERED BY (a) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + + val metastoreTable = sharedState.externalCatalog.getTable("default", "t") + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols")) + assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) + assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) + assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols")) + + checkAnswer(table("t"), Row(1, 2)) + } + } + } + + test("CTAS: persisted partitioned bucketed data source table") { + withTempDir { dir => + withTable("t") { + val path = dir.getCanonicalPath + + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS (PATH '$path') + |PARTITIONED BY (a) + |CLUSTERED BY (b) SORTED BY (c) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin + ) + + val metastoreTable = sharedState.externalCatalog.getTable("default", "t") + assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1) + assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) + assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) + assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1) + + checkAnswer(table("t"), Row(2, 3, 1)) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
