This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 48b3109b0ebe [SPARK-56619][SQL][TEST] Add DSv2 repeated table access 
tests with internal/external changes in Classic/Connect mode
48b3109b0ebe is described below

commit 48b3109b0ebe24ace04e712574e2310a8a9e76dd
Author: Thang Long Vu <[email protected]>
AuthorDate: Wed May 27 09:28:05 2026 +0800

    [SPARK-56619][SQL][TEST] Add DSv2 repeated table access tests with 
internal/external changes in Classic/Connect mode
    
    ### What changes were proposed in this pull request?
    
    Add 9 tests verifying that repeated `sql()` calls on DSv2 tables correctly 
reflect external changes made via the catalog API, covering both classic and 
Connect modes (so 18 tests in total)
    
    `DSv2RepeatedTableAccessTests` extends `DSv2ExternalMutationTestBase`, 
following the same pattern as `DSv2TempViewWithStoredPlanTests` (PR #55571). 
The tests cover three external mutation scenarios, each with a session mutation 
baseline, an external mutation test, and a caching-connector variant:
    
    - **Scenario 1 (external writes)**: External data appended via the catalog 
API is visible to subsequent `sql()` queries.
    - **Scenario 2 (external schema changes)**: External ADD COLUMN via the 
catalog API is visible to subsequent `sql()` queries.
    - **Scenario 3 (external drop/recreate)**: External drop and recreate via 
the catalog API resolves to the new empty table.
    
    For each scenario, the caching-connector variant (`cachingcat`) 
demonstrates that a connector with its own `loadTable` cache returns stale 
results until `REFRESH TABLE` invalidates the cache.
    
    #### New files
    
    - **`DSv2RepeatedTableAccessTests`**: Shared trait containing all 9 tests 
(3 scenarios x 3 variants: session mutation baseline, external mutation, 
caching connector), using `session.sql(...)` with `.collect()` calls (harmless 
in classic mode, required for Connect).
    
    #### Modified files
    
    - **`DataSourceV2DataFrameSuite`**: Mixes in `DSv2RepeatedTableAccessTests` 
(classic runner, `testPrefix = ""`).
    - **`DataSourceV2TempViewConnectSuite` -> 
`DataSourceV2DataFrameConnectSuite`**: Renamed and now mixes in both 
`DSv2TempViewWithStoredPlanTests` and `DSv2RepeatedTableAccessTests`. This is 
the Connect counterpart of the classic `DataSourceV2DataFrameSuite`.
    - **`DSv2ExternalMutationTestBase`**: Scaladoc updated to reference both 
consumer traits.
    
    ### Why are the changes needed?
    
    These tests document and lock down the expected behavior: repeated `sql()` 
access without CACHE TABLE always sees the latest table state after external 
mutations. This prevents regressions if internal resolution or caching logic 
changes. The trait extraction enables Connect-mode reuse without duplicating 
test logic.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR is test-only.
    
    ### How was this patch tested?
    
    9 new tests run in both classic and Connect modes:
    
    Classic:
    ```
    build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite -- -z "repeated sql()"'
    ```
    
    Connect:
    ```
    build/sbt 'connect/server/testOnly *DataSourceV2DataFrameConnectSuite -- -z 
"repeated sql()"'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55462 from longvu-db/dsv2-pr2-repeated-sql.
    
    Authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit e40d17535e6b52c71e5fde9de3e23dc9680b7c41)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 ...ala => DataSourceV2DataFrameConnectSuite.scala} |  14 +-
 .../connector/DSv2ExternalMutationTestBase.scala   |  13 +-
 .../connector/DSv2RepeatedTableAccessTests.scala   | 222 ++++++++++++++++++++
 .../DSv2TempViewWithStoredPlanTests.scala          | 228 ++++++++++-----------
 .../sql/connector/DataSourceV2DataFrameSuite.scala |   3 +-
 5 files changed, 358 insertions(+), 122 deletions(-)

diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
similarity index 85%
rename from 
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
rename to 
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index ce947379b233..a13e953460a7 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,17 +21,21 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests
+import org.apache.spark.sql.connector.{DSv2RepeatedTableAccessTests, 
DSv2TempViewWithStoredPlanTests}
 import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
InMemoryTableCatalog, TableCatalog}
 
 /**
- * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. All test logic 
lives in the shared
- * trait; this class only provides the Connect-specific session, catalog 
access, and result
+ * Connect-mode counterpart of 
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
+ *
+ * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]) and 
repeated table access tests
+ * ([[DSv2RepeatedTableAccessTests]]) under Spark Connect. All test logic 
lives in the shared
+ * traits; this class only provides the Connect-specific session, catalog 
access, and result
  * comparison.
  */
-class DataSourceV2TempViewConnectSuite
+class DataSourceV2DataFrameConnectSuite
     extends SparkConnectServerTest
-    with DSv2TempViewWithStoredPlanTests {
+    with DSv2TempViewWithStoredPlanTests
+    with DSv2RepeatedTableAccessTests {
 
   override def sparkConf: SparkConf = super.sparkConf
     .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 2e60c24c4460..4d16339e09de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -33,11 +33,20 @@ import 
org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Iden
  * (where the test session IS the server session) and Connect mode (where the 
test session
  * is a Connect client and catalog access requires the server session).
  *
- * Concrete suites override the abstract methods and mix in the test trait
- * [[DSv2TempViewWithStoredPlanTests]].
+ * Concrete suites override the abstract methods and mix in a test trait such 
as
+ * [[DSv2TempViewWithStoredPlanTests]] or [[DSv2RepeatedTableAccessTests]].
  */
 trait DSv2ExternalMutationTestBase extends QueryTest {
 
+  /** Fully qualified table name under the non-caching test catalog. */
+  protected val testTable: String = "testcat.ns1.ns2.tbl"
+
+  /** Fully qualified table name under the caching test catalog. */
+  protected val cachingTestTable: String = "cachingcat.ns1.ns2.tbl"
+
+  /** Identifier for the test table within its namespace. */
+  protected val testIdent: Identifier = Identifier.of(Array("ns1", "ns2"), 
"tbl")
+
   /** Prefix for test names, e.g. "" or "[connect] ". */
   protected def testPrefix: String
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
new file mode 100644
index 000000000000..533d10a94979
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Shared repeated table access tests with external changes for DSv2 tables. 
These tests verify
+ * that repeated `sql()` calls correctly reflect both session and external 
mutations:
+ *
+ *  - Scenario 1 (external writes): external data appended via the catalog API 
is visible.
+ *  - Scenario 2 (external schema changes): external ADD COLUMN via the 
catalog API is visible.
+ *  - Scenario 3 (external drop/recreate): external drop and recreate via the 
catalog API
+ *    resolves to the new empty table.
+ *
+ * Each scenario includes a session mutation baseline, an external mutation 
test, and a
+ * caching-connector variant showing stale results until `REFRESH TABLE`.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect 
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode 
`.collect()` on
+ * DDL / DML is a no-op (these execute eagerly), so this is harmless.
+ */
+trait DSv2RepeatedTableAccessTests extends DSv2ExternalMutationTestBase {
+
+  // Uses testTable, cachingTestTable, and testIdent from 
DSv2ExternalMutationTestBase.
+
+  // Scenario 1: data changes via writes
+
+  test(s"${testPrefix}repeated sql() reflects session write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100), 
Row(2, 200)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}repeated sql() reflects external write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100), 
Row(2, 200)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}connector w/ cache: repeated sql() stale after external 
write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, cachingTestTable) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        // Caching connector returns stale table: external write invisible
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        // REFRESH TABLE invalidates the connector cache, external write 
becomes visible
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 2: schema changes
+
+  test(s"${testPrefix}repeated sql() reflects session schema change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN new_col INT").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+        checkRows(
+          session.sql(s"SELECT * FROM $testTable"),
+          Seq(Row(1, 100, null), Row(2, 200, -1)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}repeated sql() reflects external schema change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+        catalog.alterTable(testIdent, addCol)
+
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        checkRows(
+          session.sql(s"SELECT * FROM $testTable"),
+          Seq(Row(1, 100, null), Row(2, 200, -1)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}connector w/ cache: repeated sql() stale after external 
schema change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, cachingTestTable) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+        catalog.alterTable(testIdent, addCol)
+
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        // Caching connector returns stale table: external changes invisible
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        // REFRESH TABLE invalidates the connector cache, schema change + data 
visible
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+        checkRows(
+          session.sql(s"SELECT * FROM $cachingTestTable"),
+          Seq(Row(1, 100, null), Row(2, 200, -1)))
+      }
+    }
+  }
+
+  // Scenario 3: drop and recreate table
+
+  test(s"${testPrefix}repeated sql() reflects session drop/recreate") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        session.sql(s"DROP TABLE $testTable").collect()
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq.empty)
+      }
+    }
+  }
+
+  test(s"${testPrefix}repeated sql() reflects external drop/recreate") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        checkRows(session.sql(s"SELECT * FROM $testTable"), Seq.empty)
+      }
+    }
+  }
+
+  test(s"${testPrefix}connector w/ cache: repeated sql() stale after external 
drop/recreate") {
+    withTestSession { session =>
+      withTestTableAndViews(session, cachingTestTable) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        // Caching connector returns stale table: drop/recreate invisible
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1, 
100)))
+
+        // REFRESH TABLE invalidates the connector cache, new empty table 
visible
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+        checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq.empty)
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
index eb40e3ac056f..1a5229258e7d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, InMemoryTableCatalog, TableChange, TableInfo}
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
 
 /**
@@ -33,21 +33,19 @@ import org.apache.spark.sql.types.{IntegerType, LongType, 
StringType}
  */
 trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
 
-  private val T = "testcat.ns1.ns2.tbl"
-  private val CT = "cachingcat.ns1.ns2.tbl"
-  private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl")
+  // Uses testTable, cachingTestTable, and testIdent from 
DSv2ExternalMutationTestBase.
 
   // Scenario 1.1 (session write)
   test(s"${testPrefix}temp view with stored plan reflects session write") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
-        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
         checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
       }
     }
@@ -56,11 +54,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 1.2 (external write)
   test(s"${testPrefix}temp view with stored plan reflects external write") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -74,11 +72,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 1.2 connector w/ cache (external write, caching connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external 
write") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -88,7 +86,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, external write 
becomes visible
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
       }
     }
@@ -97,15 +95,15 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 2.1 (session ADD COLUMN)
   test(s"${testPrefix}temp view with stored plan preserves schema after 
session ADD COLUMN") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
-        session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect()
-        session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect()
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column 
INT").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
 
         // view preserves original 2-column schema, filter still applied
         checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
@@ -116,11 +114,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 2.2 (external ADD COLUMN)
   test(s"${testPrefix}temp view with stored plan preserves schema after 
external ADD COLUMN") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // external schema change via catalog API
@@ -139,11 +137,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external ADD 
COLUMN") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -156,7 +154,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, view preserves 
original 2-column schema
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
       }
     }
@@ -165,14 +163,14 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 3.1 (session column removal)
   test(s"${testPrefix}temp view with stored plan detects session column 
removal") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
-        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+        session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
 
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
@@ -189,11 +187,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 3.2 (external column removal)
   test(s"${testPrefix}temp view with stored plan detects external column 
removal") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -215,11 +213,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 3.2 connector w/ cache (external column removal, caching 
connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external 
column removal") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -230,7 +228,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, column removal 
detected
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
           condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
@@ -246,18 +244,18 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 4.1 (session drop and recreate table)
   test(s"${testPrefix}temp view with stored plan resolves to session-recreated 
table") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
         val originalTableId = catalog.loadTable(testIdent).id
 
-        session.sql(s"DROP TABLE $T").collect()
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"DROP TABLE $testTable").collect()
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
 
         val newTableId = catalog.loadTable(testIdent).id
         assert(originalTableId != newTableId)
@@ -265,7 +263,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         // view resolves to the new empty table
         checkRows(session.table("v"), Seq.empty)
 
-        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
         checkRows(session.table("v"), Seq(Row(2, 200)))
       }
     }
@@ -274,11 +272,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 4.2 (external drop and recreate table)
   test(s"${testPrefix}temp view with stored plan resolves to externally 
recreated table") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -299,7 +297,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         // view resolves to the new empty table
         checkRows(session.table("v"), Seq.empty)
 
-        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
         checkRows(session.table("v"), Seq(Row(2, 200)))
       }
     }
@@ -308,11 +306,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 4.2 connector w/ cache (external drop/recreate, caching 
connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external 
drop/recreate") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -329,7 +327,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, view resolves to new 
empty table
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkRows(session.table("v"), Seq.empty)
       }
     }
@@ -339,20 +337,21 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   test(s"${testPrefix}temp view with stored plan after session drop and re-add 
column same type" +
       " with unfiltered view") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
-
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
-        session.table(T).createOrReplaceTempView("v_no_filter")
-        session.table(T).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+      withTestTableAndViews(session, testTable, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
+
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
+        session.table(testTable).createOrReplaceTempView("v_no_filter")
+        session.table(testTable).filter("salary IS NULL")
+          .createOrReplaceTempView("v_filter_is_null")
         checkRows(session.table("v"), Seq(Row(1, 100)))
         checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 
1000)))
         checkRows(session.table("v_filter_is_null"), Seq.empty)
 
         // drop and re-add column with same name and type
-        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
-        session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect()
+        session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN salary INT").collect()
 
         // salary values are now null, so the filtered view returns nothing
         checkRows(session.table("v"), Seq.empty)
@@ -368,13 +367,14 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   test(s"${testPrefix}temp view with stored plan after external drop and 
re-add column " +
       "same type") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
-
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
-        session.table(T).createOrReplaceTempView("v_no_filter")
-        session.table(T).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+      withTestTableAndViews(session, testTable, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
+
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
+        session.table(testTable).createOrReplaceTempView("v_no_filter")
+        session.table(testTable).filter("salary IS NULL")
+          .createOrReplaceTempView("v_filter_is_null")
         checkRows(session.table("v"), Seq(Row(1, 100)))
         checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 
1000)))
         checkRows(session.table("v_filter_is_null"), Seq.empty)
@@ -399,11 +399,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   test(s"${testPrefix}connector w/ cache: temp view stale after external 
drop/re-add column " +
       "same type") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -415,7 +415,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, salary values are 
null
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkRows(session.table("v"), Seq.empty)
       }
     }
@@ -424,15 +424,15 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 6.1 (session drop and re-add column with different type)
   test(s"${testPrefix}temp view with stored plan detects session column type 
change") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
-        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
-        session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect()
+        session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN salary 
STRING").collect()
 
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
@@ -449,11 +449,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 6.2 (external drop and re-add column with different type)
   test(s"${testPrefix}temp view with stored plan detects external column type 
change") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -476,11 +476,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 6.2 connector w/ cache (external column type change, caching 
connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external 
column type change") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -492,7 +492,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, type change detected
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
           condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
@@ -508,14 +508,14 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 7.1 (session type widening from INT to BIGINT)
   test(s"${testPrefix}temp view with stored plan detects session type 
widening") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
-        session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect()
+        session.sql(s"ALTER TABLE $testTable ALTER COLUMN salary TYPE 
LONG").collect()
 
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
@@ -532,11 +532,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 7.2 (external type widening from INT to BIGINT)
   test(s"${testPrefix}temp view with stored plan detects external type 
widening") {
     withTestSession { session =>
-      withTestTableAndViews(session, T, Seq("v")) {
-        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, testTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(testTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -558,11 +558,11 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
   // Scenario 7.2 connector w/ cache (external type widening, caching 
connector)
   test(s"${testPrefix}connector w/ cache: temp view stale after external type 
widening") {
     withTestSession { session =>
-      withTestTableAndViews(session, CT, Seq("v")) {
-        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
-        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+      withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+        session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT) 
USING foo").collect()
+        session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10, 
1000)").collect()
 
-        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(cachingTestTable).filter("salary < 
999").createOrReplaceTempView("v")
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
@@ -573,7 +573,7 @@ trait DSv2TempViewWithStoredPlanTests extends 
DSv2ExternalMutationTestBase {
         checkRows(session.table("v"), Seq(Row(1, 100)))
 
         // REFRESH TABLE invalidates the connector cache, type change detected
-        session.sql(s"REFRESH TABLE $CT").collect()
+        session.sql(s"REFRESH TABLE $cachingTestTable").collect()
         checkError(
           exception = intercept[AnalysisException] { 
session.table("v").collect() },
           condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 13f8a3455480..139a6c75d793 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -47,7 +47,8 @@ import org.apache.spark.unsafe.types.UTF8String
 
 class DataSourceV2DataFrameSuite
   extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests 
= false)
-  with DSv2TempViewWithStoredPlanTests {
+  with DSv2TempViewWithStoredPlanTests
+  with DSv2RepeatedTableAccessTests {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
   import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to