This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a47a30b59b0 [SPARK-54022][SPARK-56617][SQL][TEST] Add DSv2 CACHE 
TABLE impact on reads tests
5a47a30b59b0 is described below

commit 5a47a30b59b0c33a3e08bfc27be50884c9458d53
Author: Thang Long Vu <[email protected]>
AuthorDate: Thu May 28 20:17:07 2026 +0800

    [SPARK-54022][SPARK-56617][SQL][TEST] Add DSv2 CACHE TABLE impact on reads 
tests
    
    ### What changes were proposed in this pull request?
    
    Add 5 tests verifying CACHE TABLE impact on reads for DSv2 tables. Write 
operations ignore the cache, so these tests verify how reads behave when a 
cached table is mutated by session SQL or external catalog API calls. Tests run 
in both classic and Connect modes.
    
    `DSv2CacheTableReadTests` extends `DSv2ExternalMutationTestBase`, following 
the same pattern as `DSv2TempViewWithStoredPlanTests` (PR #55571) and 
`DSv2RepeatedTableAccessTests` (PR #55583). The tests cover five scenarios from 
the design doc:
    
    - **Scenario 1 (external write)**: Cache pins the read, external write 
invisible until `REFRESH TABLE`.
    - **Scenario 2 (session write and more external changes)**: Session write 
rebuilds cache, subsequent external write invisible until `REFRESH TABLE`.
    - **Scenario 3 (external schema changes)**: Cache pinned at original 
schema, external ADD COLUMN invisible until `REFRESH TABLE`.
    - **Scenario 4 (session schema changes and more external changes)**: 
Session ALTER rebuilds cache with new schema, subsequent external write 
invisible until `REFRESH TABLE`.
    - **Scenario 5 (external drop and recreate table)**: Query sees the new 
empty table.
    
    #### New files
    
    - **`DSv2CacheTableReadTests`**: Shared trait containing all 5 tests, using 
`session.sql(...)` with `.collect()` calls (harmless in classic mode, required 
for Connect).
    
    #### Modified files
    
    - **`DataSourceV2DataFrameSuite`**: Mixes in `DSv2CacheTableReadTests` 
alongside existing traits (classic runner, `testPrefix = ""`).
    - **`DataSourceV2DataFrameConnectSuite`**: Mixes in 
`DSv2CacheTableReadTests` alongside existing traits (Connect runner, 
`testPrefix = "[connect] "`).
    - **`DSv2ExternalMutationTestBase`**: Updated Scaladoc to reference all 
three consumer traits.
    - **`DSv2TempViewWithStoredPlanTests`**: Removed stale comment about 
inherited vals.
    
    ### Why are the changes needed?
    
    These tests document and lock down the expected CACHE TABLE behavior with 
DSv2 tables: cached reads are pinned against external mutations, and `REFRESH 
TABLE` invalidates the cache. This prevents regressions if cache invalidation 
logic changes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR is test-only.
    
    ### How was this patch tested?
    
    5 new tests run in both classic and Connect modes (10 total):
    
    Classic (175 total, all pass):
    ```
    build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite'
    ```
    
    Connect (35 total, all pass):
    ```
    build/sbt 'connect/testOnly *DataSourceV2DataFrameConnectSuite'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55536 from longvu-db/spark-dsv2-cache-scenario-5.
    
    Authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../DataSourceV2DataFrameConnectSuite.scala        |  13 +-
 .../sql/connector/DSv2CacheTableReadTests.scala    | 271 +++++++++++++++++++++
 .../connector/DSv2ExternalMutationTestBase.scala   |   3 +-
 .../DSv2TempViewWithStoredPlanTests.scala          |   2 -
 .../sql/connector/DataSourceV2DataFrameSuite.scala |   3 +-
 5 files changed, 282 insertions(+), 10 deletions(-)

diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index a13e953460a7..ce926e42ebad 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,21 +21,22 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.{DSv2RepeatedTableAccessTests, 
DSv2TempViewWithStoredPlanTests}
+import org.apache.spark.sql.connector.{DSv2CacheTableReadTests, 
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests}
 import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
InMemoryTableCatalog, TableCatalog}
 
 /**
  * Connect-mode counterpart of 
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
  *
- * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]) and 
repeated table access tests
- * ([[DSv2RepeatedTableAccessTests]]) under Spark Connect. All test logic 
lives in the shared
- * traits; this class only provides the Connect-specific session, catalog 
access, and result
- * comparison.
+ * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]), repeated 
table access tests
+ * ([[DSv2RepeatedTableAccessTests]]), and CACHE TABLE read tests 
([[DSv2CacheTableReadTests]])
+ * under Spark Connect. All test logic lives in the shared traits; this class 
only provides the
+ * Connect-specific session, catalog access, and result comparison.
  */
 class DataSourceV2DataFrameConnectSuite
     extends SparkConnectServerTest
     with DSv2TempViewWithStoredPlanTests
-    with DSv2RepeatedTableAccessTests {
+    with DSv2RepeatedTableAccessTests
+    with DSv2CacheTableReadTests {
 
   override def sparkConf: SparkConf = super.sparkConf
     .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
new file mode 100644
index 000000000000..ac6ffcc6ecc0
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Shared CACHE TABLE impact on reads tests for DSv2 tables. Write operations 
ignore the
+ * cache, so these tests verify how reads behave when a cached table is 
mutated by session
+ * SQL or external catalog API calls:
+ *
+ *  - Scenario 1 (external write): cache pins the read, external write 
invisible until REFRESH.
+ *  - Scenario 2 (session write then external write): session write rebuilds 
cache,
+ *    subsequent external write invisible until REFRESH.
+ *  - Scenario 3 (external schema change): cache pinned at original schema 
until REFRESH.
+ *  - Scenario 4 (session schema change then external write): session ALTER 
rebuilds
+ *    cache, subsequent external write invisible until REFRESH.
+ *  - Scenario 5 (external drop and recreate table): query sees new empty 
table.
+ *
+ * Scenario 1 includes a `cachingcat` variant to verify the two-layer REFRESH 
behavior:
+ * [[org.apache.spark.sql.execution.datasources.v2.RefreshTableExec]] calls 
both
+ * `invalidateTable` (clearing the connector cache) and the CacheManager 
rebuild, so external
+ * writes become visible after REFRESH even with a caching connector. 
Scenarios 2 through 4
+ * omit `cachingcat` because the CacheManager pins reads regardless of the 
connector, making
+ * the observable behavior the same. Scenario 5 (drop and recreate) includes a 
`cachingcat`
+ * variant because it differs: [[CachingInMemoryTableCatalog]] does not 
invalidate on
+ * drop/create, so `loadTable` still returns the old cached table object, 
CacheManager still
+ * matches, and stale data is served until REFRESH TABLE.
+ *
+ * Only external mutations are tested. Session DROP TABLE automatically 
uncaches the table
+ * (via the CacheManager), making a session drop+recreate scenario trivially 
different from
+ * the external variant.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect 
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode 
`.collect()` on
+ * DDL / DML is a no-op (these execute eagerly), so this is harmless.
+ */
+trait DSv2CacheTableReadTests extends DSv2ExternalMutationTestBase {
+
+  private def assertTableCached(session: SparkSession, tableName: String): 
Unit =
+    assert(session.catalog.isCached(tableName))
+
+  test(s"${testPrefix}SPARK-54022: cached table pinned against external data 
write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        session.table(testTable).cache()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        session.sql(s"REFRESH TABLE $testTable").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table pinned, " +
+      "REFRESH clears both layers") {
+    withTestSession { session =>
+      withTestTableAndViews(session, cachingTestTable) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+
+        session.table(cachingTestTable).cache()
+        assertTableCached(session, cachingTestTable)
+        checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+        val catalog =
+          getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        // Both CacheManager and connector cache are stale: external write 
invisible
+        assertTableCached(session, cachingTestTable)
+        checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+        // REFRESH TABLE calls invalidateTable (clears connector cache) and 
rebuilds
+        // the CacheManager entry, so the external write becomes visible.
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+        assertTableCached(session, cachingTestTable)
+        checkRows(session.table(cachingTestTable), Seq(Row(1, 100), Row(2, 
200)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: session write invalidates cache, " +
+      "then external write invisible") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        session.table(testTable).cache()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(3, 300))
+
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+
+        session.sql(s"REFRESH TABLE $testTable").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200), 
Row(3, 300)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: cached table pinned against external schema 
change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        session.table(testTable).cache()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+        catalog.alterTable(testIdent, addCol)
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        session.sql(s"REFRESH TABLE $testTable").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200, 
-1)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: session schema change invalidates cache, " +
+      "external write invisible") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        session.table(testTable).cache()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column 
INT").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100, null)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100, null)))
+
+        session.sql(s"REFRESH TABLE $testTable").collect()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200, 
-1)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: cached table after external drop and " +
+      "recreate sees empty table") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        session.table(testTable).cache()
+        assertTableCached(session, testTable)
+        checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val originalTableId = catalog.loadTable(testIdent).id
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        val newTableId = catalog.loadTable(testIdent).id
+        assert(originalTableId != newTableId)
+
+        val result = session.table(testTable)
+        assert(result.schema.fieldNames.toSeq == Seq("id", "salary"))
+        checkRows(result, Seq.empty)
+
+        // External drop+recreate produces a new table identity, so the prior 
cache entry
+        // is unreachable via name lookup (unlike external write/schema change 
where the
+        // cache stays pinned).
+        assert(!session.catalog.isCached(testTable))
+
+        session.sql(s"REFRESH TABLE $testTable").collect()
+        checkRows(session.table(testTable), Seq.empty)
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table stale 
after " +
+      "external drop and recreate") {
+    withTestSession { session =>
+      withTestTableAndViews(session, cachingTestTable) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+
+        session.table(cachingTestTable).cache()
+        assertTableCached(session, cachingTestTable)
+        checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+        val catalog =
+          getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
+        val originalTableId = catalog.loadTable(testIdent).id
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        // CachingInMemoryTableCatalog does not invalidate on drop/create, so 
loadTable
+        // still returns the old cached table object. CacheManager still 
matches and
+        // serves the stale cached data.
+        assertTableCached(session, cachingTestTable)
+        checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+        // REFRESH TABLE calls invalidateTable (clears connector cache) and 
rebuilds
+        // the CacheManager entry, so the new empty table becomes visible.
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+        checkRows(session.table(cachingTestTable), Seq.empty)
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 4d16339e09de..4591dca7abaf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -34,7 +34,8 @@ import org.apache.spark.sql.connector.catalog.{BufferedRows, 
CatalogV2Util, Iden
  * is a Connect client and catalog access requires the server session).
  *
  * Concrete suites override the abstract methods and mix in a test trait such 
as
- * [[DSv2TempViewWithStoredPlanTests]] or [[DSv2RepeatedTableAccessTests]].
+ * [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedTableAccessTests]],
+ * or [[DSv2CacheTableReadTests]].
  */
 trait DSv2ExternalMutationTestBase extends QueryTest {
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
index 1a5229258e7d..9f8a93e30550 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -33,8 +33,6 @@ import org.apache.spark.sql.types.{IntegerType, LongType, 
StringType}
  */
 trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
 
-  // Uses testTable, cachingTestTable, and testIdent from 
DSv2ExternalMutationTestBase.
-
   // Scenario 1.1 (session write)
   test(s"${testPrefix}temp view with stored plan reflects session write") {
     withTestSession { session =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 139a6c75d793..372be8cfe308 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -48,7 +48,8 @@ import org.apache.spark.unsafe.types.UTF8String
 class DataSourceV2DataFrameSuite
   extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests 
= false)
   with DSv2TempViewWithStoredPlanTests
-  with DSv2RepeatedTableAccessTests {
+  with DSv2RepeatedTableAccessTests
+  with DSv2CacheTableReadTests {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
   import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to