This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a9f01cb05e [SPARK-55622][SQL][TESTS] Add test for DSV2 Tables with 
multi-part names on SessionCatalog
52a9f01cb05e is described below

commit 52a9f01cb05ec06bdf334fe469b72b39f5c10516
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Feb 24 23:59:07 2026 +0800

    [SPARK-55622][SQL][TESTS] Add test for DSV2 Tables with multi-part names on 
SessionCatalog
    
    ### What changes were proposed in this pull request?
    Add a unit test for Iceberg's case of supporting multi part identifiers in 
SessionCatalog (for metadata tables).  Add a fake metadata table to 
InMemoryDataSource.
    
    ### Why are the changes needed?
    It can increase Spark coverage to catch issue like: 
https://github.com/apache/spark/pull/54247
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Ran the added test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, cursor claude 4.5 opus
    
    Closes #54411 from szehon-ho/add_test.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../sql/connector/catalog/InMemoryTable.scala      | 72 +++++++++++++++++++++-
 .../DataSourceV2DataFrameSessionCatalogSuite.scala | 24 +++++++-
 .../DataSourceV2SQLSessionCatalogSuite.scala       | 32 ++++++++++
 3 files changed, 125 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 3bea136b34d4..d5738475031d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.connector.catalog
 import java.util
 import java.util.{Objects, UUID}
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
+import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
SupportsOverwrite, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
@@ -261,3 +263,71 @@ object InMemoryTable {
     }
   }
 }
+
+/**
+ * A metadata table that returns snapshot (commit) information for a parent 
table.
+ * Simulates data source tables with multi-part identifiers, ex Iceberg's 
db.table.snapshots.
+ */
+class InMemorySnapshotsTable(parentTable: InMemoryTable) extends Table with 
SupportsRead {
+  override def name(): String = parentTable.name + ".snapshots"
+
+  override def schema(): StructType = StructType(Seq(
+    StructField("committed_at", LongType, nullable = false),
+    StructField("snapshot_id", LongType, nullable = false)
+  ))
+
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(TableCapability.BATCH_READ)
+  }
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new InMemorySnapshotsScanBuilder(parentTable)
+  }
+}
+
+class InMemorySnapshotsScanBuilder(parentTable: InMemoryTable) extends 
ScanBuilder {
+  override def build(): Scan = new InMemorySnapshotsScan(parentTable)
+}
+
+class InMemorySnapshotsScan(parentTable: InMemoryTable) extends Scan with 
Batch {
+  override def readSchema(): StructType = StructType(Seq(
+    StructField("committed_at", LongType, nullable = false),
+    StructField("snapshot_id", LongType, nullable = false)
+  ))
+
+  override def toBatch: Batch = this
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    Array(InMemorySnapshotsPartition(parentTable.commits.toSeq.map(c => (c.id, 
c.id))))
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new InMemorySnapshotsReaderFactory()
+  }
+}
+
+case class InMemorySnapshotsPartition(snapshots: Seq[(Long, Long)]) extends 
InputPartition
+
+class InMemorySnapshotsReaderFactory extends PartitionReaderFactory {
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    new 
InMemorySnapshotsReader(partition.asInstanceOf[InMemorySnapshotsPartition])
+  }
+}
+
+class InMemorySnapshotsReader(partition: InMemorySnapshotsPartition)
+    extends PartitionReader[InternalRow] {
+  private var index = -1
+  private val snapshots = partition.snapshots
+
+  override def next(): Boolean = {
+    index += 1
+    index < snapshots.size
+  }
+
+  override def get(): InternalRow = {
+    val (committedAt, snapshotId) = snapshots(index)
+    InternalRow(committedAt, snapshotId)
+  }
+
+  override def close(): Unit = {}
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index bc6ceeb24593..c3a24765fc2b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.connector
 
+import java.util.Locale
+
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.connector.catalog._
@@ -111,7 +113,25 @@ class InMemoryTableSessionCatalog extends 
TestV2SessionCatalogBase[InMemoryTable
     val identToUse = 
Option(InMemoryTableSessionCatalog.customIdentifierResolution)
       .map(_(ident))
       .getOrElse(ident)
-    super.loadTable(identToUse)
+
+    // For single-part namespaces, follow Iceberg's pattern: first try to load 
the table
+    // normally, fall back to metadata table resolution only on 
NoSuchTableException
+    try {
+      super.loadTable(identToUse)
+    } catch {
+      case _: AnalysisException if identToUse.name().toLowerCase(Locale.ROOT) 
== "snapshots" =>
+        loadSnapshotTable(identToUse)
+    }
+  }
+
+  private def loadSnapshotTable(ident: Identifier): InMemorySnapshotsTable = {
+    val parentTableName = ident.namespace().last
+    val parentNamespace = ident.namespace().dropRight(1)
+    val parentIdent = Identifier.of(
+      if (parentNamespace.isEmpty) Array("default") else parentNamespace,
+      parentTableName)
+    val parentTable = super.loadTable(parentIdent).asInstanceOf[InMemoryTable]
+    new InMemorySnapshotsTable(parentTable)
   }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
index dcc49b252fdb..9c3b429e829b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
@@ -86,4 +86,36 @@ class DataSourceV2SQLSessionCatalogSuite
       sql("SELECT char_length('Hello') as v1, ns.strlen('Spark') as v2"),
       Row(5, 5))
   }
+
+  test("SPARK-55024: data source metadata tables with multi-part identifiers") 
{
+    // This test querying data source tables with multi part identifiers,
+    // with the example of Iceberg's metadata table, eg db.table.snapshots
+    val t1 = "metadata_test_tbl"
+
+    def verifySnapshots(snapshots: DataFrame, expectedCount: Int, queryDesc: 
String): Unit = {
+      assert(snapshots.count() == expectedCount,
+        s"$queryDesc: expected $expectedCount snapshots")
+      assert(snapshots.schema.fieldNames.toSeq == Seq("committed_at", 
"snapshot_id"),
+        s"$queryDesc: expected schema [committed_at, snapshot_id], " +
+          s"got: ${snapshots.schema.fieldNames.toSeq}")
+      val snapshotIds = 
snapshots.select("snapshot_id").collect().map(_.getLong(0))
+      assert(snapshotIds.forall(_ > 0),
+        s"$queryDesc: all snapshot IDs should be positive, got: 
${snapshotIds.toSeq}")
+    }
+
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      sql(s"INSERT INTO $t1 VALUES (1, 'first')")
+      sql(s"INSERT INTO $t1 VALUES (2, 'second')")
+      sql(s"INSERT INTO $t1 VALUES (3, 'third')")
+
+      Seq(
+        s"$t1.snapshots",
+        s"default.$t1.snapshots",
+        s"spark_catalog.default.$t1.snapshots"
+      ).foreach { snapshotTable =>
+        verifySnapshots(sql(s"SELECT * FROM $snapshotTable"), 3, snapshotTable)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to