This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 52a9f01cb05e [SPARK-55622][SQL][TESTS] Add test for DSV2 Tables with
multi-part names on SessionCatalog
52a9f01cb05e is described below
commit 52a9f01cb05ec06bdf334fe469b72b39f5c10516
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Feb 24 23:59:07 2026 +0800
[SPARK-55622][SQL][TESTS] Add test for DSV2 Tables with multi-part names on
SessionCatalog
### What changes were proposed in this pull request?
Add a unit test for Iceberg's case of supporting multi part identifiers in
SessionCatalog (for metadata tables). Add a fake metadata table to
InMemoryDataSource.
### Why are the changes needed?
It can increase Spark coverage to catch issue like:
https://github.com/apache/spark/pull/54247
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran the added test
### Was this patch authored or co-authored using generative AI tooling?
Yes, cursor claude 4.5 opus
Closes #54411 from szehon-ho/add_test.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../sql/connector/catalog/InMemoryTable.scala | 72 +++++++++++++++++++++-
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 24 +++++++-
.../DataSourceV2SQLSessionCatalogSuite.scala | 32 ++++++++++
3 files changed, 125 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 3bea136b34d4..d5738475031d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.connector.catalog
import java.util
import java.util.{Objects, UUID}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
+import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
SupportsOverwrite, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
@@ -261,3 +263,71 @@ object InMemoryTable {
}
}
}
+
+/**
+ * A metadata table that returns snapshot (commit) information for a parent
table.
+ * Simulates data source tables with multi-part identifiers, ex Iceberg's
db.table.snapshots.
+ */
+class InMemorySnapshotsTable(parentTable: InMemoryTable) extends Table with
SupportsRead {
+ override def name(): String = parentTable.name + ".snapshots"
+
+ override def schema(): StructType = StructType(Seq(
+ StructField("committed_at", LongType, nullable = false),
+ StructField("snapshot_id", LongType, nullable = false)
+ ))
+
+ override def capabilities(): util.Set[TableCapability] = {
+ util.EnumSet.of(TableCapability.BATCH_READ)
+ }
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ new InMemorySnapshotsScanBuilder(parentTable)
+ }
+}
+
+class InMemorySnapshotsScanBuilder(parentTable: InMemoryTable) extends
ScanBuilder {
+ override def build(): Scan = new InMemorySnapshotsScan(parentTable)
+}
+
+class InMemorySnapshotsScan(parentTable: InMemoryTable) extends Scan with
Batch {
+ override def readSchema(): StructType = StructType(Seq(
+ StructField("committed_at", LongType, nullable = false),
+ StructField("snapshot_id", LongType, nullable = false)
+ ))
+
+ override def toBatch: Batch = this
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ Array(InMemorySnapshotsPartition(parentTable.commits.toSeq.map(c => (c.id,
c.id))))
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new InMemorySnapshotsReaderFactory()
+ }
+}
+
+case class InMemorySnapshotsPartition(snapshots: Seq[(Long, Long)]) extends
InputPartition
+
+class InMemorySnapshotsReaderFactory extends PartitionReaderFactory {
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ new
InMemorySnapshotsReader(partition.asInstanceOf[InMemorySnapshotsPartition])
+ }
+}
+
+class InMemorySnapshotsReader(partition: InMemorySnapshotsPartition)
+ extends PartitionReader[InternalRow] {
+ private var index = -1
+ private val snapshots = partition.snapshots
+
+ override def next(): Boolean = {
+ index += 1
+ index < snapshots.size
+ }
+
+ override def get(): InternalRow = {
+ val (committedAt, snapshotId) = snapshots(index)
+ InternalRow(committedAt, snapshotId)
+ }
+
+ override def close(): Unit = {}
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index bc6ceeb24593..c3a24765fc2b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.connector
+import java.util.Locale
+
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.connector.catalog._
@@ -111,7 +113,25 @@ class InMemoryTableSessionCatalog extends
TestV2SessionCatalogBase[InMemoryTable
val identToUse =
Option(InMemoryTableSessionCatalog.customIdentifierResolution)
.map(_(ident))
.getOrElse(ident)
- super.loadTable(identToUse)
+
+ // For single-part namespaces, follow Iceberg's pattern: first try to load
the table
+ // normally, fall back to metadata table resolution only on
NoSuchTableException
+ try {
+ super.loadTable(identToUse)
+ } catch {
+ case _: AnalysisException if identToUse.name().toLowerCase(Locale.ROOT)
== "snapshots" =>
+ loadSnapshotTable(identToUse)
+ }
+ }
+
+ private def loadSnapshotTable(ident: Identifier): InMemorySnapshotsTable = {
+ val parentTableName = ident.namespace().last
+ val parentNamespace = ident.namespace().dropRight(1)
+ val parentIdent = Identifier.of(
+ if (parentNamespace.isEmpty) Array("default") else parentNamespace,
+ parentTableName)
+ val parentTable = super.loadTable(parentIdent).asInstanceOf[InMemoryTable]
+ new InMemorySnapshotsTable(parentTable)
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
index dcc49b252fdb..9c3b429e829b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
@@ -86,4 +86,36 @@ class DataSourceV2SQLSessionCatalogSuite
sql("SELECT char_length('Hello') as v1, ns.strlen('Spark') as v2"),
Row(5, 5))
}
+
+ test("SPARK-55024: data source metadata tables with multi-part identifiers")
{
+ // This test querying data source tables with multi part identifiers,
+ // with the example of Iceberg's metadata table, eg db.table.snapshots
+ val t1 = "metadata_test_tbl"
+
+ def verifySnapshots(snapshots: DataFrame, expectedCount: Int, queryDesc:
String): Unit = {
+ assert(snapshots.count() == expectedCount,
+ s"$queryDesc: expected $expectedCount snapshots")
+ assert(snapshots.schema.fieldNames.toSeq == Seq("committed_at",
"snapshot_id"),
+ s"$queryDesc: expected schema [committed_at, snapshot_id], " +
+ s"got: ${snapshots.schema.fieldNames.toSeq}")
+ val snapshotIds =
snapshots.select("snapshot_id").collect().map(_.getLong(0))
+ assert(snapshotIds.forall(_ > 0),
+ s"$queryDesc: all snapshot IDs should be positive, got:
${snapshotIds.toSeq}")
+ }
+
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ sql(s"INSERT INTO $t1 VALUES (1, 'first')")
+ sql(s"INSERT INTO $t1 VALUES (2, 'second')")
+ sql(s"INSERT INTO $t1 VALUES (3, 'third')")
+
+ Seq(
+ s"$t1.snapshots",
+ s"default.$t1.snapshots",
+ s"spark_catalog.default.$t1.snapshots"
+ ).foreach { snapshotTable =>
+ verifySnapshots(sql(s"SELECT * FROM $snapshotTable"), 3, snapshotTable)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]