This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5a47a30b59b0 [SPARK-54022][SPARK-56617][SQL][TEST] Add DSv2 CACHE
TABLE impact on reads tests
5a47a30b59b0 is described below
commit 5a47a30b59b0c33a3e08bfc27be50884c9458d53
Author: Thang Long Vu <[email protected]>
AuthorDate: Thu May 28 20:17:07 2026 +0800
[SPARK-54022][SPARK-56617][SQL][TEST] Add DSv2 CACHE TABLE impact on reads
tests
### What changes were proposed in this pull request?
Add 5 tests verifying CACHE TABLE impact on reads for DSv2 tables. Write
operations ignore the cache, so these tests verify how reads behave when a
cached table is mutated by session SQL or external catalog API calls. Tests run
in both classic and Connect modes.
`DSv2CacheTableReadTests` extends `DSv2ExternalMutationTestBase`, following
the same pattern as `DSv2TempViewWithStoredPlanTests` (PR #55571) and
`DSv2RepeatedTableAccessTests` (PR #55583). The tests cover five scenarios from
the design doc:
- **Scenario 1 (external write)**: Cache pins the read, external write
invisible until `REFRESH TABLE`.
- **Scenario 2 (session write and more external changes)**: Session write
rebuilds cache, subsequent external write invisible until `REFRESH TABLE`.
- **Scenario 3 (external schema changes)**: Cache pinned at original
schema, external ADD COLUMN invisible until `REFRESH TABLE`.
- **Scenario 4 (session schema changes and more external changes)**:
Session ALTER rebuilds cache with new schema, subsequent external write
invisible until `REFRESH TABLE`.
- **Scenario 5 (external drop and recreate table)**: Query sees the new
empty table.
#### New files
- **`DSv2CacheTableReadTests`**: Shared trait containing all 5 tests, using
`session.sql(...)` with `.collect()` calls (harmless in classic mode, required
for Connect).
#### Modified files
- **`DataSourceV2DataFrameSuite`**: Mixes in `DSv2CacheTableReadTests`
alongside existing traits (classic runner, `testPrefix = ""`).
- **`DataSourceV2DataFrameConnectSuite`**: Mixes in
`DSv2CacheTableReadTests` alongside existing traits (Connect runner,
`testPrefix = "[connect] "`).
- **`DSv2ExternalMutationTestBase`**: Updated Scaladoc to reference all
three consumer traits.
- **`DSv2TempViewWithStoredPlanTests`**: Removed stale comment about
inherited vals.
### Why are the changes needed?
These tests document and lock down the expected CACHE TABLE behavior with
DSv2 tables: cached reads are pinned against external mutations, and `REFRESH
TABLE` invalidates the cache. This prevents regressions if cache invalidation
logic changes.
### Does this PR introduce _any_ user-facing change?
No. This PR is test-only.
### How was this patch tested?
5 new tests run in both classic and Connect modes (10 total):
Classic (175 total, all pass):
```
build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite'
```
Connect (35 total, all pass):
```
build/sbt 'connect/testOnly *DataSourceV2DataFrameConnectSuite'
```
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55536 from longvu-db/spark-dsv2-cache-scenario-5.
Authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../DataSourceV2DataFrameConnectSuite.scala | 13 +-
.../sql/connector/DSv2CacheTableReadTests.scala | 271 +++++++++++++++++++++
.../connector/DSv2ExternalMutationTestBase.scala | 3 +-
.../DSv2TempViewWithStoredPlanTests.scala | 2 -
.../sql/connector/DataSourceV2DataFrameSuite.scala | 3 +-
5 files changed, 282 insertions(+), 10 deletions(-)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index a13e953460a7..ce926e42ebad 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,21 +21,22 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.{DSv2RepeatedTableAccessTests,
DSv2TempViewWithStoredPlanTests}
+import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests}
import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, TableCatalog}
/**
* Connect-mode counterpart of
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
*
- * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]) and
repeated table access tests
- * ([[DSv2RepeatedTableAccessTests]]) under Spark Connect. All test logic
lives in the shared
- * traits; this class only provides the Connect-specific session, catalog
access, and result
- * comparison.
+ * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]), repeated
table access tests
+ * ([[DSv2RepeatedTableAccessTests]]), and CACHE TABLE read tests
([[DSv2CacheTableReadTests]])
+ * under Spark Connect. All test logic lives in the shared traits; this class
only provides the
+ * Connect-specific session, catalog access, and result comparison.
*/
class DataSourceV2DataFrameConnectSuite
extends SparkConnectServerTest
with DSv2TempViewWithStoredPlanTests
- with DSv2RepeatedTableAccessTests {
+ with DSv2RepeatedTableAccessTests
+ with DSv2CacheTableReadTests {
override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
new file mode 100644
index 000000000000..ac6ffcc6ecc0
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2CacheTableReadTests.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Shared CACHE TABLE impact on reads tests for DSv2 tables. Write operations
ignore the
+ * cache, so these tests verify how reads behave when a cached table is
mutated by session
+ * SQL or external catalog API calls:
+ *
+ * - Scenario 1 (external write): cache pins the read, external write
invisible until REFRESH.
+ * - Scenario 2 (session write then external write): session write rebuilds
cache,
+ * subsequent external write invisible until REFRESH.
+ * - Scenario 3 (external schema change): cache pinned at original schema
until REFRESH.
+ * - Scenario 4 (session schema change then external write): session ALTER
rebuilds
+ * cache, subsequent external write invisible until REFRESH.
+ * - Scenario 5 (external drop and recreate table): query sees new empty
table.
+ *
+ * Scenario 1 includes a `cachingcat` variant to verify the two-layer REFRESH
behavior:
+ * [[org.apache.spark.sql.execution.datasources.v2.RefreshTableExec]] calls
both
+ * `invalidateTable` (clearing the connector cache) and the CacheManager
rebuild, so external
+ * writes become visible after REFRESH even with a caching connector.
Scenarios 2 through 4
+ * omit `cachingcat` because the CacheManager pins reads regardless of the
connector, making
+ * the observable behavior the same. Scenario 5 (drop and recreate) includes a
`cachingcat`
+ * variant because it differs: [[CachingInMemoryTableCatalog]] does not
invalidate on
+ * drop/create, so `loadTable` still returns the old cached table object,
CacheManager still
+ * matches, and stale data is served until REFRESH TABLE.
+ *
+ * Only external mutations are tested. Session DROP TABLE automatically
uncaches the table
+ * (via the CacheManager), making a session drop+recreate scenario trivially
different from
+ * the external variant.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on
+ * DDL / DML is a no-op (these execute eagerly), so this is harmless.
+ */
+trait DSv2CacheTableReadTests extends DSv2ExternalMutationTestBase {
+
+ private def assertTableCached(session: SparkSession, tableName: String):
Unit =
+ assert(session.catalog.isCached(tableName))
+
+ test(s"${testPrefix}SPARK-54022: cached table pinned against external data
write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ session.table(testTable).cache()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ session.sql(s"REFRESH TABLE $testTable").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table pinned, " +
+ "REFRESH clears both layers") {
+ withTestSession { session =>
+ withTestTableAndViews(session, cachingTestTable) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+
+ session.table(cachingTestTable).cache()
+ assertTableCached(session, cachingTestTable)
+ checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+ val catalog =
+ getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ // Both CacheManager and connector cache are stale: external write
invisible
+ assertTableCached(session, cachingTestTable)
+ checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+ // REFRESH TABLE calls invalidateTable (clears connector cache) and
rebuilds
+ // the CacheManager entry, so the external write becomes visible.
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+ assertTableCached(session, cachingTestTable)
+ checkRows(session.table(cachingTestTable), Seq(Row(1, 100), Row(2,
200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: session write invalidates cache, " +
+ "then external write invisible") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ session.table(testTable).cache()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(3, 300))
+
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200)))
+
+ session.sql(s"REFRESH TABLE $testTable").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100), Row(2, 200),
Row(3, 300)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: cached table pinned against external schema
change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ session.table(testTable).cache()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog.alterTable(testIdent, addCol)
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ session.sql(s"REFRESH TABLE $testTable").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200,
-1)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: session schema change invalidates cache, " +
+ "external write invisible") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ session.table(testTable).cache()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column
INT").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100, null)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100, null)))
+
+ session.sql(s"REFRESH TABLE $testTable").collect()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100, null), Row(2, 200,
-1)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: cached table after external drop and " +
+ "recreate sees empty table") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ session.table(testTable).cache()
+ assertTableCached(session, testTable)
+ checkRows(session.table(testTable), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val originalTableId = catalog.loadTable(testIdent).id
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ val newTableId = catalog.loadTable(testIdent).id
+ assert(originalTableId != newTableId)
+
+ val result = session.table(testTable)
+ assert(result.schema.fieldNames.toSeq == Seq("id", "salary"))
+ checkRows(result, Seq.empty)
+
+ // External drop+recreate produces a new table identity, so the prior
cache entry
+ // is unreachable via name lookup (unlike external write/schema change
where the
+ // cache stays pinned).
+ assert(!session.catalog.isCached(testTable))
+
+ session.sql(s"REFRESH TABLE $testTable").collect()
+ checkRows(session.table(testTable), Seq.empty)
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54022: connector w/ cache: cached table stale
after " +
+ "external drop and recreate") {
+ withTestSession { session =>
+ withTestTableAndViews(session, cachingTestTable) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+
+ session.table(cachingTestTable).cache()
+ assertTableCached(session, cachingTestTable)
+ checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+ val catalog =
+ getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
+ val originalTableId = catalog.loadTable(testIdent).id
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // CachingInMemoryTableCatalog does not invalidate on drop/create, so
loadTable
+ // still returns the old cached table object. CacheManager still
matches and
+ // serves the stale cached data.
+ assertTableCached(session, cachingTestTable)
+ checkRows(session.table(cachingTestTable), Seq(Row(1, 100)))
+
+ // REFRESH TABLE calls invalidateTable (clears connector cache) and
rebuilds
+ // the CacheManager entry, so the new empty table becomes visible.
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+ checkRows(session.table(cachingTestTable), Seq.empty)
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 4d16339e09de..4591dca7abaf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -34,7 +34,8 @@ import org.apache.spark.sql.connector.catalog.{BufferedRows,
CatalogV2Util, Iden
* is a Connect client and catalog access requires the server session).
*
* Concrete suites override the abstract methods and mix in a test trait such
as
- * [[DSv2TempViewWithStoredPlanTests]] or [[DSv2RepeatedTableAccessTests]].
+ * [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedTableAccessTests]],
+ * or [[DSv2CacheTableReadTests]].
*/
trait DSv2ExternalMutationTestBase extends QueryTest {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
index 1a5229258e7d..9f8a93e30550 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -33,8 +33,6 @@ import org.apache.spark.sql.types.{IntegerType, LongType,
StringType}
*/
trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
- // Uses testTable, cachingTestTable, and testIdent from
DSv2ExternalMutationTestBase.
-
// Scenario 1.1 (session write)
test(s"${testPrefix}temp view with stored plan reflects session write") {
withTestSession { session =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 139a6c75d793..372be8cfe308 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -48,7 +48,8 @@ import org.apache.spark.unsafe.types.UTF8String
class DataSourceV2DataFrameSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests
= false)
with DSv2TempViewWithStoredPlanTests
- with DSv2RepeatedTableAccessTests {
+ with DSv2RepeatedTableAccessTests
+ with DSv2CacheTableReadTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]