This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a97d3b9 [SPARK-31204][SQL] HiveResult compatibility for DatasourceV2
command
a97d3b9 is described below
commit a97d3b9f4f4ddd215ecaa7f96c64aeba6e825f74
Author: Terry Kim <[email protected]>
AuthorDate: Fri Mar 27 12:48:14 2020 +0800
[SPARK-31204][SQL] HiveResult compatibility for DatasourceV2 command
### What changes were proposed in this pull request?
`HiveResult` performs some conversions for commands to be compatible with
Hive output, e.g.:
```
// If it is a describe command for a Hive table, we want to have the output
format be similar with Hive.
case ExecutedCommandExec(_: DescribeCommandBase) =>
...
// SHOW TABLES in Hive only output table names, while ours output database,
table name, isTemp.
case command ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
```
This conversion is needed for DatasourceV2 commands as well and this PR
proposes to add the conversion for v2 commands `SHOW TABLES` and `DESCRIBE
TABLE`.
### Why are the changes needed?
This is a bug where conversion is not applied to v2 commands.
### Does this PR introduce any user-facing change?
Yes, now the outputs for v2 commands `SHOW TABLES` and `DESCRIBE TABLE` are
compatible with HIVE output.
For example, with a table created as:
```
CREATE TABLE testcat.ns.tbl (id bigint COMMENT 'col1') USING foo
```
The output of `SHOW TABLES` has changed from
```
ns table
```
to
```
table
```
And the output of `DESCRIBE TABLE` has changed from
```
id bigint col1
# Partitioning
Not partitioned
```
to
```
id bigint col1
# Partitioning
Not partitioned
```
### How was this patch tested?
Added unit tests.
Closes #28004 from imback82/hive_result.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/execution/HiveResult.scala | 29 +++++++++++++-------
.../spark/sql/execution/HiveResultSuite.scala | 32 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index ff820bf..21874bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -24,6 +24,7 @@ import java.time.{Instant, LocalDate}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeCommandBase,
ExecutedCommandExec, ShowTablesCommand}
+import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec,
ShowTablesExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@@ -38,18 +39,17 @@ object HiveResult {
*/
def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan
match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
- // If it is a describe command for a Hive table, we want to have the
output format
- // be similar with Hive.
- executedPlan.executeCollectPublic().map {
- case Row(name: String, dataType: String, comment) =>
- Seq(name, dataType,
- Option(comment.asInstanceOf[String]).getOrElse(""))
- .map(s => String.format(s"%-20s", s))
- .mkString("\t")
- }
- // SHOW TABLES in Hive only output table names, while ours output
database, table name, isTemp.
+ formatDescribeTableOutput(executedPlan.executeCollectPublic())
+ case _: DescribeTableExec =>
+ formatDescribeTableOutput(executedPlan.executeCollectPublic())
+ // SHOW TABLES in Hive only output table names while our v1 command outputs
+ // database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended
=>
command.executeCollect().map(_.getString(1))
+ // SHOW TABLES in Hive only output table names while our v2 command outputs
+ // namespace and table name.
+ case command : ShowTablesExec =>
+ command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] =
other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
@@ -59,6 +59,15 @@ object HiveResult {
.map(_.mkString("\t"))
}
+ private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = {
+ rows.map {
+ case Row(name: String, dataType: String, comment) =>
+ Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse(""))
+ .map(s => String.format(s"%-20s", s))
+ .mkString("\t")
+ }
+ }
+
private def zoneId =
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
private def dateFormatter = DateFormatter(zoneId)
private def timestampFormatter =
TimestampFormatter.getFractionFormatter(zoneId)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bf7cbaa..5e81c74 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.connector.InMemoryTableCatalog
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT,
SharedSparkSession}
class HiveResultSuite extends SharedSparkSession {
@@ -68,4 +69,35 @@ class HiveResultSuite extends SharedSparkSession {
val result = HiveResult.hiveResultString(executedPlan)
assert(result.head === "0.00000000")
}
+
+ test("SHOW TABLES in hive result") {
+ withSQLConf("spark.sql.catalog.testcat" ->
classOf[InMemoryTableCatalog].getName) {
+ Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl",
"csv")).foreach {
+ case (ns, tbl, source) =>
+ withTable(s"$ns.$tbl") {
+ spark.sql(s"CREATE TABLE $ns.$tbl (id bigint) USING $source")
+ val df = spark.sql(s"SHOW TABLES FROM $ns")
+ val executedPlan = df.queryExecution.executedPlan
+ assert(HiveResult.hiveResultString(executedPlan).head == tbl)
+ }
+ }
+ }
+ }
+
+ test("DESCRIBE TABLE in hive result") {
+ withSQLConf("spark.sql.catalog.testcat" ->
classOf[InMemoryTableCatalog].getName) {
+ Seq(("testcat.ns", "tbl", "foo"), ("spark_catalog.default", "tbl",
"csv")).foreach {
+ case (ns, tbl, source) =>
+ withTable(s"$ns.$tbl") {
+ spark.sql(s"CREATE TABLE $ns.$tbl (id bigint COMMENT 'col1') USING
$source")
+ val df = spark.sql(s"DESCRIBE $ns.$tbl")
+ val executedPlan = df.queryExecution.executedPlan
+ val expected = "id " +
+ "\tbigint " +
+ "\tcol1 "
+ assert(HiveResult.hiveResultString(executedPlan).head == expected)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]