This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 810a547e20e6 [SPARK-52109][SQL] Add listTableSummaries API to Data
Source V2 Table Catalog API
810a547e20e6 is described below
commit 810a547e20e616bc3bd89e1caa8e2062818b5971
Author: Uros Stankovic <[email protected]>
AuthorDate: Mon May 19 17:20:14 2025 +0800
[SPARK-52109][SQL] Add listTableSummaries API to Data Source V2 Table
Catalog API
### What changes were proposed in this pull request?
- Add new API in DSv2 `TableCatalog` class, called `listTableSummaries`.
Return value should be array of `TableSummary` objects.
- Added table type property which should represent a type of v2 table.
- When conversion from v1 table to v2 table happens, we also provided table
type property to v2 table.
- `JDBCTableCatalog` would invoke `listTables` and treat all tables as
foreign. On that way, we need just one SQL command on remote system to get
table summaries.
### Why are the changes needed?
Since DSv2 `Table` class can represent different table types, we currently
need to do `listTables` + `loadTable` operations. Which can be expensive.
I propose adding new interface to list table summaries, smaller amount of
data needed to list all information for SHOW TABLES command.
Many remote systems (and implementors of DSv2 TableCatalog) can make
implementation of newly added API with just one RPC.
### Does this PR introduce _any_ user-facing change?
New API called `listTableSummaries` is added to `TableCatalog`.
### How was this patch tested?
Added new test cases in existing suites
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50886 from
urosstan-db/SPARK-52109-Add-list-table-summaries-API-to-Table-catalog.
Lead-authored-by: Uros Stankovic <[email protected]>
Co-authored-by: Uros Stankovic
<[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/connector/catalog/TableCatalog.java | 35 +++++++++++++++++
.../spark/sql/connector/catalog/TableSummary.java | 44 ++++++++++++++++++++++
.../sql/connector/catalog/CatalogV2Util.scala | 3 +-
.../spark/sql/connector/catalog/V1Table.scala | 16 ++++++++
.../spark/sql/connector/catalog/CatalogSuite.scala | 27 +++++++++++++
.../datasources/v2/jdbc/JDBCTableCatalog.scala | 8 +++-
.../datasources/v2/V2SessionCatalogSuite.scala | 29 +++++++++++++-
.../v2/jdbc/JDBCTableCatalogSuite.scala | 23 +++++++++++
8 files changed, 182 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index f2cbafbe8e5b..ff1170a9b1a8 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -26,6 +26,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors;
import org.apache.spark.sql.errors.QueryExecutionErrors;
import org.apache.spark.sql.types.StructType;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -62,6 +63,11 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_EXTERNAL = "external";
+ /**
+ * A reserved property that indicates table entity type (external, managed,
view, etc.).
+ */
+ String PROP_TABLE_TYPE = "table_type";
+
/**
* A reserved property to specify the description of the table.
*/
@@ -103,6 +109,35 @@ public interface TableCatalog extends CatalogPlugin {
*/
Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
+ /**
+ * List the table summaries in a namespace from the catalog.
+ * <p>
+ * This method should return all tables entities from a catalog regardless
of type (i.e. views
+ * should be listed as well).
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for tables
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
+ * @throws NoSuchTableException If certain table listed by listTables API
does not exist.
+ */
+ default TableSummary[] listTableSummaries(String[] namespace)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ Identifier[] tableIdentifiers = this.listTables(namespace);
+ ArrayList<TableSummary> tableSummaries = new
ArrayList<>(tableIdentifiers.length);
+ for (Identifier identifier : tableIdentifiers) {
+ Table table = this.loadTable(identifier);
+
+ // If table type property is not present, we assume that table type is
`FOREIGN`.
+ String tableType = table.properties().getOrDefault(
+ TableCatalog.PROP_TABLE_TYPE,
+ TableSummary.FOREIGN_TABLE_TYPE);
+
+ tableSummaries.add(TableSummary.of(identifier, tableType));
+ };
+
+ return tableSummaries.toArray(TableSummary[]::new);
+ }
+
/**
* Load table metadata by {@link Identifier identifier} from the catalog.
* <p>
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java
new file mode 100644
index 000000000000..e08f83b98319
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Evolving
+public interface TableSummary {
+ String MANAGED_TABLE_TYPE = "MANAGED";
+ String EXTERNAL_TABLE_TYPE = "EXTERNAL";
+ String VIEW_TABLE_TYPE = "VIEW";
+ String FOREIGN_TABLE_TYPE = "FOREIGN";
+
+ Identifier identifier();
+ String tableType();
+
+ static TableSummary of(Identifier identifier, String tableType) {
+ return new TableSummaryImpl(identifier, tableType);
+ }
+}
+
+record TableSummaryImpl(Identifier identifier, String tableType) implements
TableSummary {
+ TableSummaryImpl {
+ checkNotNull(identifier, "Identifier of a table summary object cannot
be null");
+ checkNotNull(tableType, "Table type of a table summary object cannot
be null");
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index d6fa7f58d61c..e0def4d4aab0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -59,7 +59,8 @@ private[sql] object CatalogV2Util {
TableCatalog.PROP_PROVIDER,
TableCatalog.PROP_OWNER,
TableCatalog.PROP_EXTERNAL,
- TableCatalog.PROP_IS_MANAGED_LOCATION)
+ TableCatalog.PROP_IS_MANAGED_LOCATION,
+ TableCatalog.PROP_TABLE_TYPE)
/**
* The list of reserved namespace properties, which can not be removed or
changed directly by
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
index 570ab1338dbf..eee6ddf3e58f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
@@ -79,6 +79,8 @@ private[sql] object V1Table {
def addV2TableProperties(v1Table: CatalogTable): Map[String, String] = {
val external = v1Table.tableType == CatalogTableType.EXTERNAL
val managed = v1Table.tableType == CatalogTableType.MANAGED
+ val tableTypeProperties: Option[(String, String)] = getV2TableType(v1Table)
+ .map(tableType => TableCatalog.PROP_TABLE_TYPE -> tableType)
v1Table.properties ++
v1Table.storage.properties.map { case (key, value) =>
@@ -91,8 +93,22 @@ private[sql] object V1Table {
} ++
(if (managed) Some(TableCatalog.PROP_IS_MANAGED_LOCATION -> "true") else
None) ++
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
+ tableTypeProperties ++
Some(TableCatalog.PROP_OWNER -> v1Table.owner)
}
+
+ /**
+ * Returns v2 table type that should be part of v2 table properties.
+ * If there is no mapping between v1 table type and v2 table type, then None
is returned.
+ */
+ private def getV2TableType(v1Table: CatalogTable): Option[String] = {
+ v1Table.tableType match {
+ case CatalogTableType.EXTERNAL => Some(TableSummary.EXTERNAL_TABLE_TYPE)
+ case CatalogTableType.MANAGED => Some(TableSummary.MANAGED_TABLE_TYPE)
+ case CatalogTableType.VIEW => Some(TableSummary.VIEW_TABLE_TYPE)
+ case _ => None
+ }
+ }
}
/**
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index d15fd064000c..178c43258816 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -110,6 +110,33 @@ class CatalogSuite extends SparkFunSuite {
assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
}
+ test("listTableSummaries") {
+ val catalog = newCatalog()
+ val ident1 = Identifier.of(Array("ns"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns"), "test_table_2")
+
+ intercept[NoSuchNamespaceException](catalog.listTables(Array("ns")))
+
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withPartitions(emptyTrans)
+ .withProperties(emptyProps).build()
+ catalog.createTable(ident1, tableInfo)
+
+ assertResult(
+ Set(TableSummary.of(ident1, TableSummary.FOREIGN_TABLE_TYPE))
+ )(catalog.listTableSummaries(Array("ns")).toSet)
+
+ catalog.createTable(ident2, columns, emptyTrans, emptyProps)
+
+ assertResult(
+ Set(
+ TableSummary.of(ident1, TableSummary.FOREIGN_TABLE_TYPE),
+ TableSummary.of(ident2, TableSummary.FOREIGN_TABLE_TYPE)
+ )
+ )(catalog.listTableSummaries(Array("ns")).toSet)
+ }
+
test("createTable: non-partitioned table") {
val catalog = newCatalog()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index b46223db6abb..7ea2ca1b793b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table,
TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table,
TableCatalog, TableChange, TableSummary}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{DataTypeErrorsBase,
QueryCompilationErrors, QueryExecutionErrors}
@@ -84,6 +84,12 @@ class JDBCTableCatalog extends TableCatalog
}
}
+ override def listTableSummaries(namespace: Array[String]):
Array[TableSummary] = {
+ // Each table from remote database system is treated as foreign table.
+ this.listTables(namespace)
+ .map(identifier => TableSummary.of(identifier,
TableSummary.FOREIGN_TABLE_TYPE))
+ }
+
override def tableExists(ident: Identifier): Boolean = {
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 3900d6eb97eb..19d8cba25308 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.AnalysisException
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange,
V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange,
TableSummary, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructType, TimestampType}
@@ -121,6 +121,33 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
catalog.dropTable(ident3)
}
+ test("listTableSummaries") {
+ val namespace = Array("ns")
+ val catalog = newCatalog()
+ val identManaged = Identifier.of(namespace, "managed_table")
+ val identExternal = Identifier.of(namespace, "external_table")
+
+ assert(catalog.listTableSummaries(namespace).isEmpty)
+
+ val externalTableProperties = Map(
+ TableCatalog.PROP_EXTERNAL -> "1",
+ TableCatalog.PROP_LOCATION -> "file://"
+ )
+
+ catalog.createTable(identManaged, columns, emptyTrans, emptyProps)
+ catalog.createTable(identExternal, columns, emptyTrans,
externalTableProperties.asJava)
+
+ val tableSummaries = catalog.listTableSummaries(namespace).toSet
+ val expectedTableSummaries = Set(
+ TableSummary.of(identManaged, TableSummary.MANAGED_TABLE_TYPE),
+ TableSummary.of(identExternal, TableSummary.EXTERNAL_TABLE_TYPE)
+ )
+ assertResult(expectedTableSummaries)(tableSummaries)
+
+ catalog.dropTable(identManaged)
+ catalog.dropTable(identExternal)
+ }
+
test("createTable") {
val catalog = newCatalog()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 45a463822cd1..a6019da61f17 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties
+import scala.jdk.CollectionConverters._
+
import org.apache.logging.log4j.Level
import org.apache.spark.{SparkConf, SparkIllegalArgumentException}
@@ -26,17 +28,29 @@ import org.apache.spark.sql.{AnalysisException, QueryTest,
Row}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.catalog.{Identifier, TableSummary}
import org.apache.spark.sql.errors.DataTypeErrors.{toSQLConf, toSQLStmt}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val tempDir = Utils.createTempDir()
val url =
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
+ private val tableCatalog: JDBCTableCatalog = {
+ val catalog = new JDBCTableCatalog()
+ val catalogOptions = Map(
+ "url" -> url,
+ "driver" -> "org.h2.Driver"
+ )
+
+ catalog.initialize("jdbc_table_catalog", new
CaseInsensitiveStringMap(catalogOptions.asJava))
+ catalog
+ }
object JdbcClientTypes {
val CHAR = "CHARACTER"
@@ -152,6 +166,15 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
}
+ test("list table summary") {
+ val tableSummaries = tableCatalog.listTableSummaries(Array("test"))
+ val expectedTable = TableSummary.of(
+ Identifier.of(Array("test"), "people"),
+ TableSummary.FOREIGN_TABLE_TYPE
+ )
+ assertResult(Array(expectedTable))(tableSummaries)
+ }
+
test("load a table") {
val t = spark.table("h2.test.people")
val expectedSchema = new StructType()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]