Repository: spark
Updated Branches:
refs/heads/master aab99d31a -> 8a12580d2
[SPARK-14127][SQL] "DESC <table>": Extracts schema information from table
properties for data source tables
## What changes were proposed in this pull request?
This is a follow-up of #12934 and #12844. This PR adds a set of utility methods
in `DDLUtils` to help extract schema information (user-defined schema,
partition columns, and bucketing information) from data source table
properties. These utility methods are then used in `DescribeTableCommand` to
refine output for data source tables. Before this PR, the aforementioned schema
information are only shown as table properties, which are hard to read.
Sample output:
```
+----------------------------+---------------------------------------------------------+-------+
|col_name |data_type
|comment|
+----------------------------+---------------------------------------------------------+-------+
|a |bigint
| |
|b |bigint
| |
|c |bigint
| |
|d |bigint
| |
|# Partition Information |
| |
|# col_name |
| |
|d |
| |
| |
| |
|# Detailed Table Information|
| |
|Database: |default
| |
|Owner: |lian
| |
|Create Time: |Tue May 10 03:20:34 PDT 2016
| |
|Last Access Time: |Wed Dec 31 16:00:00 PST 1969
| |
|Location:
|file:/Users/lian/local/src/spark/workspace-a/target/... | |
|Table Type: |MANAGED
| |
|Table Parameters: |
| |
| rawDataSize |-1
| |
| numFiles |1
| |
| transient_lastDdlTime |1462875634
| |
| totalSize |684
| |
| spark.sql.sources.provider|parquet
| |
| EXTERNAL |FALSE
| |
| COLUMN_STATS_ACCURATE |false
| |
| numRows |-1
| |
| |
| |
|# Storage Information |
| |
|SerDe Library:
|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
|InputFormat: |org.apache.hadoop.mapred.SequenceFileInputFormat
| |
|OutputFormat:
|org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat| |
|Compressed: |No
| |
|Num Buckets: |2
| |
|Bucket Columns: |[b]
| |
|Sort Columns: |[c]
| |
|Storage Desc Parameters: |
| |
| path
|file:/Users/lian/local/src/spark/workspace-a/target/... | |
| serialization.format |1
| |
+----------------------------+---------------------------------------------------------+-------+
```
## How was this patch tested?
Test cases are added in `HiveDDLSuite` to check command output.
Author: Cheng Lian <[email protected]>
Closes #13025 from liancheng/spark-14127-extract-schema-info.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a12580d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a12580d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a12580d
Branch: refs/heads/master
Commit: 8a12580d25b1ce5abc45c600483fad69f90ca333
Parents: aab99d3
Author: Cheng Lian <[email protected]>
Authored: Tue May 10 09:00:53 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 10 09:00:53 2016 -0700
----------------------------------------------------------------------
.../spark/sql/execution/command/ddl.scala | 81 ++++++++++++++++++--
.../spark/sql/execution/command/tables.scala | 66 ++++++++++++----
.../spark/sql/hive/execution/HiveDDLSuite.scala | 56 +++++++++++++-
3 files changed, 183 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8a12580d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 085bdaf..0b0b618 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition,
CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
-import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types._
@@ -457,7 +455,6 @@ case class AlterTableSetLocation(
}
Seq.empty[Row]
}
-
}
@@ -489,9 +486,83 @@ private[sql] object DDLUtils {
case _ =>
})
}
+
def isTablePartitioned(table: CatalogTable): Boolean = {
- table.partitionColumns.size > 0 ||
+ table.partitionColumns.nonEmpty ||
table.properties.contains("spark.sql.sources.schema.numPartCols")
}
-}
+ def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType]
= {
+ getSchemaFromTableProperties(metadata.properties)
+ }
+
+ // A persisted data source table may not store its schema in the catalog. In
this case, its schema
+ // will be inferred at runtime when the table is referenced.
+ def getSchemaFromTableProperties(props: Map[String, String]):
Option[StructType] = {
+ require(isDatasourceTable(props))
+
+ val schemaParts = for {
+ numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
+ index <- 0 until numParts.toInt
+ } yield props.getOrElse(
+ s"spark.sql.sources.schema.part.$index",
+ throw new AnalysisException(
+ s"Corrupted schema in catalog: $numParts parts expected, but part
$index is missing."
+ )
+ )
+
+ if (schemaParts.isEmpty) {
+ None
+ } else {
+ Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
+ }
+ }
+
+ private def getColumnNamesByTypeFromTableProperties(
+ props: Map[String, String], colType: String, typeName: String):
Seq[String] = {
+ require(isDatasourceTable(props))
+
+ for {
+ numCols <-
props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
+ index <- 0 until numCols.toInt
+ } yield props.getOrElse(
+ s"spark.sql.sources.schema.${colType}Col.$index",
+ throw new AnalysisException(
+ s"Corrupted $typeName in catalog: $numCols parts expected, but part
$index is missing."
+ )
+ )
+ }
+
+ def getPartitionColumnsFromTableProperties(metadata: CatalogTable):
Seq[String] = {
+ getPartitionColumnsFromTableProperties(metadata.properties)
+ }
+
+ def getPartitionColumnsFromTableProperties(props: Map[String, String]):
Seq[String] = {
+ getColumnNamesByTypeFromTableProperties(props, "part", "partitioning
columns")
+ }
+
+ def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
+ getNumBucketFromTableProperties(metadata.properties)
+ }
+
+ def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int]
= {
+ require(isDatasourceTable(props))
+ props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
+ }
+
+ def getBucketingColumnsFromTableProperties(metadata: CatalogTable):
Seq[String] = {
+ getBucketingColumnsFromTableProperties(metadata.properties)
+ }
+
+ def getBucketingColumnsFromTableProperties(props: Map[String, String]):
Seq[String] = {
+ getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing
columns")
+ }
+
+ def getSortingColumnsFromTableProperties(metadata: CatalogTable):
Seq[String] = {
+ getSortingColumnsFromTableProperties(metadata.properties)
+ }
+
+ def getSortingColumnsFromTableProperties(props: Map[String, String]):
Seq[String] = {
+ getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/8a12580d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 954dcca..0f90715 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -309,12 +309,29 @@ case class DescribeTableCommand(table: TableIdentifier,
isExtended: Boolean, isF
// Shows data columns and partitioned columns (if any)
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- describeSchema(table.schema, buffer)
+ if (DDLUtils.isDatasourceTable(table)) {
+ val schema = DDLUtils.getSchemaFromTableProperties(table)
- if (table.partitionColumns.nonEmpty) {
- append(buffer, "# Partition Information", "", "")
- append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
- describeSchema(table.partitionColumns, buffer)
+ if (schema.isEmpty) {
+ append(buffer, "# Schema of this table is inferred at runtime", "", "")
+ } else {
+ schema.foreach(describeSchema(_, buffer))
+ }
+
+ val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table)
+ if (partCols.nonEmpty) {
+ append(buffer, "# Partition Information", "", "")
+ append(buffer, s"# ${output.head.name}", "", "")
+ partCols.foreach(col => append(buffer, col, "", ""))
+ }
+ } else {
+ describeSchema(table.schema, buffer)
+
+ if (table.partitionColumns.nonEmpty) {
+ append(buffer, "# Partition Information", "", "")
+ append(buffer, s"# ${output.head.name}", output(1).name,
output(2).name)
+ describeSchema(table.partitionColumns, buffer)
+ }
}
}
@@ -338,26 +355,47 @@ case class DescribeTableCommand(table: TableIdentifier,
isExtended: Boolean, isF
append(buffer, "Table Type:", table.tableType.name, "")
append(buffer, "Table Parameters:", "", "")
- table.properties.foreach { case (key, value) =>
+ table.properties.filterNot {
+ // Hides schema properties that hold user-defined schema, partition
columns, and bucketing
+ // information since they are already extracted and shown in other parts.
+ case (key, _) => key.startsWith("spark.sql.sources.schema")
+ }.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
+ describeStorageInfo(table, buffer)
+ }
+
+ private def describeStorageInfo(metadata: CatalogTable, buffer:
ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Storage Information", "", "")
- table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:",
serdeLib, ""))
- table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:",
format, ""))
- table.storage.outputFormat.foreach(format => append(buffer,
"OutputFormat:", format, ""))
- append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else
"No", "")
- append(buffer, "Num Buckets:", table.numBuckets.toString, "")
- append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ",
", "]"), "")
- append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ",
"]"), "")
+ metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe
Library:", serdeLib, ""))
+ metadata.storage.inputFormat.foreach(format => append(buffer,
"InputFormat:", format, ""))
+ metadata.storage.outputFormat.foreach(format => append(buffer,
"OutputFormat:", format, ""))
+ append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else
"No", "")
+ describeBucketingInfo(metadata, buffer)
append(buffer, "Storage Desc Parameters:", "", "")
- table.storage.serdeProperties.foreach { case (key, value) =>
+ metadata.storage.serdeProperties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}
+ private def describeBucketingInfo(metadata: CatalogTable, buffer:
ArrayBuffer[Row]): Unit = {
+ if (DDLUtils.isDatasourceTable(metadata)) {
+ val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
+ val bucketCols =
DDLUtils.getBucketingColumnsFromTableProperties(metadata)
+ val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
+ append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""),
"")
+ append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"),
"")
+ append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
+ } else {
+ append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
+ append(buffer, "Bucket Columns:",
metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
+ append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[",
", ", "]"), "")
+ }
+ }
+
private def describeSchema(schema: Seq[CatalogColumn], buffer:
ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase,
column.comment.orNull)
http://git-wip-us.apache.org/repos/asf/spark/blob/8a12580d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index a8ba952..0f23949 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -531,4 +531,58 @@ class HiveDDLSuite
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
+
+ test("desc table for data source table - no user-defined schema") {
+ withTable("t1") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(1).write.parquet(path)
+ sql(s"CREATE TABLE t1 USING parquet OPTIONS (PATH '$path')")
+
+ val desc = sql("DESC FORMATTED t1").collect().toSeq
+
+ assert(desc.contains(Row("# Schema of this table is inferred at
runtime", "", "")))
+ }
+ }
+ }
+
+ test("desc table for data source table - partitioned bucketed table") {
+ withTable("t1") {
+ sqlContext
+ .range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
+ .bucketBy(2, "b").sortBy("c").partitionBy("d")
+ .saveAsTable("t1")
+
+ val formattedDesc = sql("DESC FORMATTED t1").collect()
+
+ assert(formattedDesc.containsSlice(
+ Seq(
+ Row("a", "bigint", ""),
+ Row("b", "bigint", ""),
+ Row("c", "bigint", ""),
+ Row("d", "bigint", ""),
+ Row("# Partition Information", "", ""),
+ Row("# col_name", "", ""),
+ Row("d", "", ""),
+ Row("", "", ""),
+ Row("# Detailed Table Information", "", ""),
+ Row("Database:", "default", "")
+ )
+ ))
+
+ assert(formattedDesc.containsSlice(
+ Seq(
+ Row("Table Type:", "MANAGED", "")
+ )
+ ))
+
+ assert(formattedDesc.containsSlice(
+ Seq(
+ Row("Num Buckets:", "2", ""),
+ Row("Bucket Columns:", "[b]", ""),
+ Row("Sort Columns:", "[c]", "")
+ )
+ ))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]