This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd986e8 [SPARK-36588] Migrate SHOW TABLES to use V2 command by default
dd986e8 is described below
commit dd986e87fc0069102d64c68826f30cfb2e103d08
Author: Terry Kim <[email protected]>
AuthorDate: Mon Oct 11 19:28:32 2021 +0800
[SPARK-36588] Migrate SHOW TABLES to use V2 command by default
### What changes were proposed in this pull request?
This PR proposes to use V2 commands as default as outlined in
[SPARK-36588](https://issues.apache.org/jira/browse/SPARK-36588), and this PR
migrates `SHOW TABLES` to use v2 command by default.
### Why are the changes needed?
It's been a while since we introduced the v2 commands, and it seems
reasonable to use v2 commands by default even for the session catalog, with a
legacy config to fall back to the v1 commands.
### Does this PR introduce _any_ user-facing change?
No, the user can use v1 command by setting `spark.sql.legacy.useV1Command`
to `true`.
### How was this patch tested?
Added unit tests.
Closes #34137 from imback82/SPARK-36588-show-tables.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 2 ++
.../sql/catalyst/analysis/KeepLegacyOutputs.scala} | 26 +++++++++++++-------
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++
.../catalyst/analysis/ResolveSessionCatalog.scala | 2 +-
.../execution/datasources/v2/ShowTablesExec.scala | 11 +++++++--
.../datasources/v2/V2SessionCatalog.scala | 6 ++++-
.../spark/sql/execution/QueryExecutionSuite.scala | 10 +++-----
.../execution/command/ShowTablesSuiteBase.scala | 20 +++++++++-------
.../sql/execution/command/v1/ShowTablesSuite.scala | 28 ++++++++++++++++++++--
.../org/apache/spark/sql/test/SQLTestUtils.scala | 11 +++++++++
.../hive/execution/command/ShowTablesSuite.scala | 2 ++
11 files changed, 97 insertions(+), 31 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0f90159..3ec50b5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -262,6 +262,8 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveHints.ResolveCoalesceHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
+ Batch("Keep Legacy Outputs", Once,
+ KeepLegacyOutputs),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions(v1SessionCatalog) ::
ResolveNamespace(catalogManager) ::
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala
similarity index 51%
copy from
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
copy to
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala
index b6db9a3..baee2bd 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala
@@ -15,19 +15,27 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive.execution.command
+package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.execution.command.v1
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTables}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
+import org.apache.spark.sql.internal.SQLConf
/**
- * The class contains tests for the `SHOW TABLES` command to check V1 Hive
external table catalog.
+ * A rule for keeping the SQL command's legacy outputs.
*/
-class ShowTablesSuite extends v1.ShowTablesSuiteBase with CommandSuiteBase {
- test("hive client calls") {
- withNamespaceAndTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id int) $defaultUsing")
- checkHiveClientCalls(expected = 3) {
- sql(s"SHOW TABLES IN $catalog.ns")
+object KeepLegacyOutputs extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
+ plan
+ } else {
+ plan.resolveOperatorsUpWithPruning(
+ _.containsPattern(COMMAND)) {
+ case s: ShowTables =>
+ assert(s.output.length == 3)
+ val newOutput = s.output.head.withName("database") +: s.output.tail
+ s.copy(output = newOutput)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 98aad1c..1dc4a8e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3394,6 +3394,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_USE_V1_COMMAND =
+ buildConf("spark.sql.legacy.useV1Command")
+ .internal()
+ .doc("When true, Spark will use legacy V1 SQL commands.")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(false)
+
/**
* Holds information about keys that have been deprecated.
*
@@ -4115,6 +4123,8 @@ class SQLConf extends Serializable with Logging {
def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
+ def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5dacc16..e17a879 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -269,7 +269,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) =>
DropDatabaseCommand(db, d.ifExists, d.cascade)
- case ShowTables(DatabaseInSessionCatalog(db), pattern, output) =>
+ case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if
conf.useV1Command =>
val newOutput = if
(conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
assert(output.length == 3)
output.head.withName("database") +: output.tail
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
index b624e62..cde3dfb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala
@@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
-import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.LeafExecNode
/**
@@ -40,10 +40,17 @@ case class ShowTablesExec(
val tables = catalog.listTables(namespace.toArray)
tables.map { table =>
if (pattern.map(StringUtils.filterPattern(Seq(table.name()),
_).nonEmpty).getOrElse(true)) {
- rows += toCatalystRow(table.namespace().quoted, table.name(), false)
+ rows += toCatalystRow(table.namespace().quoted, table.name(),
isTempView(table))
}
}
rows.toSeq
}
+
+ private def isTempView(ident: Identifier): Boolean = {
+ catalog match {
+ case s: V2SessionCatalog => s.isTempView(ident)
+ case _ => false
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 33b8f22..1202498 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -53,7 +53,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
case Array(db) =>
catalog
.listTables(db)
- .map(ident => Identifier.of(Array(ident.database.getOrElse("")),
ident.table))
+ .map(ident =>
Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table))
.toArray
case _ =>
throw QueryCompilationErrors.noSuchNamespaceError(namespace)
@@ -277,6 +277,10 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.noSuchNamespaceError(namespace)
}
+ def isTempView(ident: Identifier): Boolean = {
+ catalog.isTempView(ident.namespace() :+ ident.name())
+ }
+
override def toString: String = s"V2SessionCatalog($name)"
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 82bc22f3..86261b4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -23,7 +23,7 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult,
LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.execution.command.{ExecutedCommandExec,
ShowTablesCommand}
+import org.apache.spark.sql.execution.datasources.v2.ShowTablesExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
@@ -247,9 +247,7 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(showTablesQe.commandExecuted.isInstanceOf[CommandResult])
assert(showTablesQe.executedPlan.isInstanceOf[CommandResultExec])
val showTablesResultExec =
showTablesQe.executedPlan.asInstanceOf[CommandResultExec]
-
assert(showTablesResultExec.commandPhysicalPlan.isInstanceOf[ExecutedCommandExec])
-
assert(showTablesResultExec.commandPhysicalPlan.asInstanceOf[ExecutedCommandExec]
- .cmd.isInstanceOf[ShowTablesCommand])
+
assert(showTablesResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
val project = Project(showTables.output, SubqueryAlias("s", showTables))
val projectQe = qe(project)
@@ -260,8 +258,6 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(projectQe.commandExecuted.children(0).children(0).isInstanceOf[CommandResult])
assert(projectQe.executedPlan.isInstanceOf[CommandResultExec])
val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec]
- assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ExecutedCommandExec])
- assert(cmdResultExec.commandPhysicalPlan.asInstanceOf[ExecutedCommandExec]
- .cmd.isInstanceOf[ShowTablesCommand])
+ assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala
index a01adb8..5f56b91 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala
@@ -111,17 +111,19 @@ trait ShowTablesSuiteBase extends QueryTest with
DDLCommandTestUtils {
}
test("change current catalog and namespace with USE statements") {
- withNamespaceAndTable("ns", "table") { t =>
- sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
+ withCurrentCatalogAndNamespace {
+ withNamespaceAndTable("ns", "table") { t =>
+ sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
- sql(s"USE $catalog")
- // No table is matched since the current namespace is not ["ns"]
- assert(defaultNamespace != Seq("ns"))
- runShowTablesSql("SHOW TABLES", Seq())
+ sql(s"USE $catalog")
+ // No table is matched since the current namespace is not ["ns"]
+ assert(defaultNamespace != Seq("ns"))
+ runShowTablesSql("SHOW TABLES", Seq())
- // Update the current namespace to match "ns.tbl".
- sql(s"USE $catalog.ns")
- runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false)))
+ // Update the current namespace to match "ns.tbl".
+ sql(s"USE $catalog.ns")
+ runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false)))
+ }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala
index f47493b..23b9b54 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.command.v1
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.execution.command
@@ -32,6 +35,26 @@ import org.apache.spark.sql.internal.SQLConf
*/
trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase {
override def defaultNamespace: Seq[String] = Seq("default")
+ var _version: String = ""
+ override def version: String = _version
+
+ // Tests using V1 catalogs will run with `spark.sql.legacy.useV1Command` on
and off
+ // to test both V1 and V2 commands.
+ override def test(testName: String, testTags: Tag*)(testFun: => Any)
+ (implicit pos: Position): Unit = {
+ Seq(true, false).foreach { useV1Command =>
+ _version = if (useV1Command) {
+ "using V1 catalog with V1 command"
+ } else {
+ "using V1 catalog with V2 command"
+ }
+ super.test(testName, testTags: _*) {
+ withSQLConf(SQLConf.LEGACY_USE_V1_COMMAND.key ->
useV1Command.toString) {
+ testFun
+ }
+ }
+ }
+ }
private def withSourceViews(f: => Unit): Unit = {
withTable("source", "source2") {
@@ -84,7 +107,7 @@ trait ShowTablesSuiteBase extends
command.ShowTablesSuiteBase {
false -> "PARTITION(YEAR = 2015, Month = 1)"
).foreach { case (caseSensitive, partitionSpec) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
- val df = sql(s"SHOW TABLE EXTENDED LIKE 'part_table' $partitionSpec")
+ val df = sql(s"SHOW TABLE EXTENDED IN ns LIKE 'part_table'
$partitionSpec")
val information = df.select("information").first().getString(0)
assert(information.contains("Partition Values: [year=2015,
month=1]"))
}
@@ -129,7 +152,6 @@ trait ShowTablesSuiteBase extends
command.ShowTablesSuiteBase {
}
}
-
test("show table in a not existing namespace") {
val msg = intercept[NoSuchDatabaseException] {
runShowTablesSql(s"SHOW TABLES IN $catalog.unknown", Seq())
@@ -143,6 +165,8 @@ trait ShowTablesSuiteBase extends
command.ShowTablesSuiteBase {
* The class contains tests for the `SHOW TABLES` command to check V1
In-Memory table catalog.
*/
class ShowTablesSuite extends ShowTablesSuiteBase with CommandSuiteBase {
+ override def version: String = super[ShowTablesSuiteBase].version
+
test("SPARK-33670: show partitions from a datasource table") {
import testImplicits._
withNamespace(s"$catalog.ns") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index ba0b599..ae42541 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -393,6 +393,17 @@ private[sql] trait SQLTestUtilsBase
}
/**
+ * Restores the current catalog/database after calling `f`.
+ */
+ protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = {
+ val curCatalog = sql("select current_catalog()").head().getString(0)
+ val curDatabase = sql("select current_database()").head().getString(0)
+ Utils.tryWithSafeFinally(f) {
+ spark.sql(s"USE $curCatalog.$curDatabase")
+ }
+ }
+
+ /**
* Enables Locale `language` before executing `f`, then switches back to the
default locale of JVM
* after `f` returns.
*/
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
index b6db9a3..6050618 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.execution.command.v1
* The class contains tests for the `SHOW TABLES` command to check V1 Hive
external table catalog.
*/
class ShowTablesSuite extends v1.ShowTablesSuiteBase with CommandSuiteBase {
+ override def version: String = super[ShowTablesSuiteBase].version
+
test("hive client calls") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int) $defaultUsing")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]