This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 805b086d53e9 [SPARK-56618][SQL][TEST] Add DSv2 join refresh tests for 
incrementally constructed queries
805b086d53e9 is described below

commit 805b086d53e9c4b400e0204f0f2013cd340d00db
Author: Thang Long Vu <[email protected]>
AuthorDate: Sun May 31 14:04:58 2026 +0800

    [SPARK-56618][SQL][TEST] Add DSv2 join refresh tests for incrementally 
constructed queries
    
    ### What changes were proposed in this pull request?
    
    Add tests for incrementally constructed queries to 
`DataSourceV2DataFrameSuite` and `DataSourceV2DataFrameConnectSuite`, along 
with new test catalogs.
    
    When DataFrames are analyzed at different times and then joined 
(self-join), the refresh phase in QueryExecution must align all table 
references to the same version. These tests verify 6 core scenarios with 
catalog ID support variants:
    
    - **Scenario 1** (external + same-session insert): Both sides of the 
self-join see the latest version after an INSERT between df1 and df2 creation.
    - **Scenario 2** (external + same-session ADD COLUMN): In classic mode, df1 
keeps its original 2-column schema while df2 has the new 3-column schema; both 
see the latest data. In Connect mode, both sides re-resolve with the new 
3-column schema.
    - **Scenario 3** (external + same-session DROP COLUMN): In classic mode, 
removing a column that df1 references throws `COLUMNS_MISMATCH`. In Connect 
mode, both sides re-resolve without the dropped column and the join succeeds.
    - **Scenario 4a** (external drop/recreate, table with both table and column 
ID support): In classic mode, drop+recreate produces a new table ID; join 
throws `TABLE_ID_MISMATCH`. In Connect mode, both sides re-resolve to the 
recreated table.
    - **Scenario 4b** (external drop/recreate, table without table ID support, 
but with column ID support): In classic mode, column IDs detect the recreate; 
join throws `COLUMN_ID_MISMATCH`. In Connect mode, both sides re-resolve to the 
recreated table.
    - **Scenario 4c** (external drop/recreate, table without table ID support 
and without column ID support): Neither ID check fires, so the drop/recreate 
goes undetected. In classic mode, df1 keeps its pre-drop snapshot while df2 
reads the recreated table, so the join finds no matching ids and returns no 
rows. In Connect mode, both sides re-resolve to the recreated table and the 
join sees the row appended after recreate.
    - **Scenario 5a** (external drop+re-add column, table without table ID 
support, but with column ID support): In classic mode, two separate alterTable 
calls assign a fresh column ID; join throws `COLUMN_ID_MISMATCH`. In Connect 
mode, both sides re-resolve with the new column ID.
    - **Scenario 5b** (external drop+re-add column, table without table ID 
support and without column ID support): Neither ID check fires; join succeeds 
(change goes undetected). Same behavior in both modes.
    - **Scenario 6** (external type change, table with both table and column ID 
support): In classic mode, the delete removes the old column ID and the add 
assigns a fresh one, so the column ID check fires; join throws 
`COLUMN_ID_MISMATCH`. In Connect mode, both sides re-resolve with the new 
column type.
    
    #### Test architecture
    
    All tests live in the shared `DSv2IncrementallyConstructedQueryTests` trait:
    - **Scenarios 1, 5b** produce identical results in both classic and Connect 
modes.
    - **Scenarios 2, 3, 4a, 4b, 4c, 5a, 6** use `if (isConnect)` to assert the 
correct expected result for each mode, since Connect defers resolution until 
execution while classic preserves the schema captured at DataFrame creation 
time.
    - **`DataSourceV2DataFrameConnectSuite`** mixes in 
`DSv2IncrementallyConstructedQueryTests` alongside the existing 
`DSv2TempViewWithStoredPlanTests`, `DSv2RepeatedTableAccessTests`, and 
`DSv2CacheTableReadTests` traits.
    
    #### Behavioral corrections in existing tests
    
    Three existing null-column-ID tests had expected results corrected from 
`Row(1, 100)` to `Row(1, null)`. This reflects that version tracking now 
properly refreshes the table after drop+re-add column, so the re-added salary 
column (with a fresh column ID) returns null rather than the stale value. This 
fix was caused by the `setVersionAndValidatedVersionFrom` addition described 
below.
    
    #### Shared test infrastructure changes
    
    - **`NullTableIdAndNullColumnIdInMemoryTableCatalog`** (new): Test catalog 
that returns `null` for both table ID and column ID, simulating catalogs 
without ID support for scenarios 4c and 5b.
    - **`InMemoryBaseTable`**: Added `setVersionAndValidatedVersionFrom` 
method. Test catalogs that decorate tables (e.g. 
`NullColumnIdInMemoryTableCatalog`, 
`NullTableIdAndNullColumnIdInMemoryTableCatalog`) create new objects that start 
at version 0. Without this call, `V2TableRefreshUtil` would see version 0 on 
every load and never detect that the table has changed, breaking stale-table 
refresh for incrementally constructed queries.
    - **`InMemoryTable`**: Refactored `copy()` to use 
`setVersionAndValidatedVersionFrom`, ensuring version-copy consistency.
    - **`DSv2ExternalMutationTestBase`**: Added `isConnect` helper for 
mode-specific assertions.
    - **`TypeChangeResetsColIdTableCatalog`**, 
**`NullColumnIdInMemoryTableCatalog`**, **`MixedColumnIdTableCatalog`**, 
**`ComposedColumnIdTableCatalog`**, **`InMemoryRowLevelOperationTableCatalog`** 
(modified): Added `setVersionAndValidatedVersionFrom` calls to propagate 
version state consistently across all custom catalog table types.
    
    ### Why are the changes needed?
    
    The existing tests covered scenarios 1-3 within larger multi-step tests but 
did not cover:
    - Table identity detection via table ID and column ID (scenarios 4a-4c)
    - Column identity detection via column ID (scenarios 5a-5b)
    - Type change detection via column ID (scenario 6)
    - The case where neither table ID nor column ID is supported, showing the 
limitation (scenarios 4c, 5b)
    - The behavioral difference between classic and Connect modes for 
incrementally constructed queries
    
    These scenarios are important to verify the correctness of DSv2 table 
refresh behavior for incrementally constructed queries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR only adds tests and test-only catalogs.
    
    ### How was this patch tested?
    
    New tests in `DSv2IncrementallyConstructedQueryTests` trait, run via 
`DataSourceV2DataFrameSuite` (classic) and `DataSourceV2DataFrameConnectSuite` 
(Connect).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55463 from longvu-db/dsv2-pr3-joins.
    
    Authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit d76d7f5f6367518b4cb96b796a2c6ac83ec8303a)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalog/ComposedColumnIdTableCatalog.scala     |   2 +
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  19 +
 .../InMemoryRowLevelOperationTableCatalog.scala    |   2 +
 .../sql/connector/catalog/InMemoryTable.scala      |   5 +-
 .../catalog/MixedColumnIdTableCatalog.scala        |   2 +
 .../catalog/NullColumnIdInMemoryTableCatalog.scala |   2 +
 ...bleIdAndNullColumnIdInMemoryTableCatalog.scala} |  61 ++-
 .../TypeChangeResetsColIdTableCatalog.scala        |   1 +
 .../DataSourceV2DataFrameConnectSuite.scala        |  19 +-
 .../connector/DSv2ExternalMutationTestBase.scala   |   5 +-
 .../DSv2IncrementallyConstructedQueryTests.scala   | 495 +++++++++++++++++++++
 .../sql/connector/DataSourceV2DataFrameSuite.scala |  25 +-
 12 files changed, 587 insertions(+), 51 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
index 0ef1a1970dea..64488a76db7f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
@@ -96,6 +96,7 @@ class ComposedColumnIdTableCatalog extends 
InMemoryTableCatalog {
       table.constraints,
       id = table.id)
     composedTable.alterTableWithData(table.data, table.schema)
+    composedTable.setVersionAndValidatedVersionFrom(table)
     tables.put(ident, composedTable)
     composedTable
   }
@@ -157,6 +158,7 @@ class ComposedColumnIdTableCatalog extends 
InMemoryTableCatalog {
       alteredTable.constraints,
       id = alteredTable.id)
     composedTable.alterTableWithData(alteredTable.data, alteredTable.schema)
+    composedTable.setVersionAndValidatedVersionFrom(alteredTable)
     tables.put(ident, composedTable)
     composedTable
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 06997662fd8b..6b09c0090de1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -97,6 +97,25 @@ abstract class InMemoryBaseTable(
     tableVersion = version.toInt
   }
 
+  /**
+   * Copies version and validated version from another table.
+   *
+   * Some test catalogs (e.g. [[NullColumnIdInMemoryTableCatalog]],
+   * [[NullTableIdAndNullColumnIdInMemoryTableCatalog]]) create a new table 
object
+   * that overrides specific behavior (such as nulling out column IDs). The new
+   * object's version counter starts at 0. Without this call, the version 
counter
+   * resets every time the catalog creates such a replacement table, breaking 
the
+   * monotonic-version assumption that downstream consumers rely on (e.g.
+   * [[InMemoryTable]].copy, validated-version propagation, and the 
join-refresh
+   * tests in [[DSv2IncrementallyConstructedQueryTests]]).
+   */
+  def setVersionAndValidatedVersionFrom(sourceTable: InMemoryBaseTable): Unit 
= {
+    setVersion(sourceTable.version())
+    if (sourceTable.validatedVersion() != null) {
+      setValidatedVersion(sourceTable.validatedVersion())
+    }
+  }
+
   def increaseVersion(): Unit = {
     tableVersion += 1
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index 81f976dce510..285a2891bc93 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -82,6 +82,7 @@ class InMemoryRowLevelOperationTableCatalog
       constraints = constraints,
       tableId = table.id)
     newTable.alterTableWithData(table.data, schema)
+    newTable.setVersionAndValidatedVersionFrom(table)
 
     tables.put(ident, newTable)
 
@@ -123,6 +124,7 @@ class PartialSchemaEvolutionCatalog extends 
InMemoryRowLevelOperationTableCatalo
       properties = properties,
       constraints = table.constraints)
     newTable.alterTableWithData(table.data, table.schema)
+    newTable.setVersionAndValidatedVersionFrom(table)
     tables.put(ident, newTable)
     newTable
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 66db9c18fa98..c783bfbece14 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -158,10 +158,7 @@ class InMemoryTable(
 
     copiedTable.commits ++= commits.map(_.copy())
 
-    copiedTable.setVersion(version())
-    if (validatedVersion() != null) {
-      copiedTable.setValidatedVersion(validatedVersion())
-    }
+    copiedTable.setVersionAndValidatedVersionFrom(this)
 
     copiedTable
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
index 70898c98afb8..f77cad3c077d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
@@ -50,6 +50,7 @@ class MixedColumnIdTableCatalog extends InMemoryTableCatalog {
       id = table.id,
       nullIdNames = snapshot)
     mixedTable.alterTableWithData(table.data, table.schema)
+    mixedTable.setVersionAndValidatedVersionFrom(table)
     mixedTable
   }
 
@@ -120,6 +121,7 @@ class MixedColumnIdInMemoryTable(
     dataMap.synchronized {
       copiedTable.alterTableWithData(data, schema)
     }
+    copiedTable.setVersionAndValidatedVersionFrom(this)
     copiedTable
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
index ee544f4da0e7..c26ce263c1f8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
@@ -45,6 +45,7 @@ class NullColumnIdInMemoryTableCatalog extends 
InMemoryTableCatalog {
       constraints = table.constraints,
       id = table.id)
     nullColIdTable.alterTableWithData(table.data, table.schema)
+    nullColIdTable.setVersionAndValidatedVersionFrom(table)
     nullColIdTable
   }
 
@@ -100,6 +101,7 @@ class NullColumnIdInMemoryTable(
     dataMap.synchronized {
       copiedTable.alterTableWithData(data, schema)
     }
+    copiedTable.setVersionAndValidatedVersionFrom(this)
     copiedTable
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
similarity index 58%
copy from 
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
copy to 
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
index ee544f4da0e7..df7964f63b85 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
@@ -22,84 +22,79 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.internal.connector.ColumnImpl
 
 /**
- * An [[InMemoryTableCatalog]] that strips column IDs from all columns
- * ([[Column.id]] returns null). This simulates connectors that do not
- * support column identity tracking.
+ * An [[InMemoryTableCatalog]] that strips both table IDs ([[Table.id]]
+ * returns null) and column IDs ([[Column.id]] returns null). This simulates
+ * connectors that support neither table nor column identity tracking.
  *
- * Tables are stored as [[NullColumnIdInMemoryTable]] instances that
- * override [[columns]] to strip IDs. Data is copied from the table
- * created by the parent [[InMemoryTableCatalog]].
- *
- * When column IDs are null, [[V2TableUtil.validateColumnIds]]
- * skips validation entirely, meaning drop/re-add of a column is NOT
- * detected via column IDs.
+ * When both IDs are null, neither the table identity check in 
[[V2TableRefreshUtil]]
+ * nor [[V2TableUtil.validateColumnIds]] fires, so drop/recreate of a table or
+ * drop/re-add of a column goes undetected.
  */
-class NullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog {
+class NullTableIdAndNullColumnIdInMemoryTableCatalog extends 
InMemoryTableCatalog {
 
-  private def toNullColumnIdTable(table: InMemoryTable): 
NullColumnIdInMemoryTable = {
-    val nullColIdTable = new NullColumnIdInMemoryTable(
+  private def toNullIdsTable(
+      table: InMemoryTable): NullTableIdAndNullColumnIdInMemoryTable = {
+    val nullTable = new NullTableIdAndNullColumnIdInMemoryTable(
       name = table.name,
       columns = table.columns(),
       partitioning = table.partitioning,
       properties = table.properties,
-      constraints = table.constraints,
-      id = table.id)
-    nullColIdTable.alterTableWithData(table.data, table.schema)
-    nullColIdTable
+      constraints = table.constraints)
+    nullTable.alterTableWithData(table.data, table.schema)
+    nullTable.setVersionAndValidatedVersionFrom(table)
+    nullTable
   }
 
   override def createTable(
       ident: Identifier,
       info: TableInfo): Table = {
     val table = super.createTable(ident, info).asInstanceOf[InMemoryTable]
-    val nullColIdTable = toNullColumnIdTable(table)
-    tables.put(ident, nullColIdTable)
-    nullColIdTable
+    val nullTable = toNullIdsTable(table)
+    tables.put(ident, nullTable)
+    nullTable
   }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
     val table = super.alterTable(ident, changes: 
_*).asInstanceOf[InMemoryTable]
-    val nullColIdTable = toNullColumnIdTable(table)
-    tables.put(ident, nullColIdTable)
-    nullColIdTable
+    val nullTable = toNullIdsTable(table)
+    tables.put(ident, nullTable)
+    nullTable
   }
 }
 
 /**
- * An [[InMemoryTable]] whose [[columns]] method always returns null
- * column IDs, simulating a connector that does not support column
- * identity tracking.
+ * An [[InMemoryTable]] with both null table ID and null column IDs,
+ * simulating a connector that supports neither identity tracking mechanism.
  */
-class NullColumnIdInMemoryTable(
+class NullTableIdAndNullColumnIdInMemoryTable(
     name: String,
     columns: Array[Column],
     partitioning: Array[Transform],
     properties: java.util.Map[String, String],
-    constraints: Array[Constraint] = Array.empty,
-    override val id: String = java.util.UUID.randomUUID().toString)
+    constraints: Array[Constraint] = Array.empty)
   extends InMemoryTable(
     name = name,
     columns = columns,
     partitioning = partitioning,
     properties = properties,
     constraints = constraints,
-    id = id) {
+    id = null) {
 
   override def columns(): Array[Column] = {
     super.columns().map(_.asInstanceOf[ColumnImpl].copy(id = null))
   }
 
   override def copy(): Table = {
-    val copiedTable = new NullColumnIdInMemoryTable(
+    val copiedTable = new NullTableIdAndNullColumnIdInMemoryTable(
       name,
       columns(),
       partitioning,
       properties,
-      constraints,
-      id)
+      constraints)
     dataMap.synchronized {
       copiedTable.alterTableWithData(data, schema)
     }
+    copiedTable.setVersionAndValidatedVersionFrom(this)
     copiedTable
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
index e1c54994c511..d68f2e62b136 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
@@ -63,6 +63,7 @@ class TypeChangeResetsColIdTableCatalog extends 
InMemoryTableCatalog {
       alteredTable.constraints,
       id = alteredTable.id)
     tableWithResetIds.alterTableWithData(alteredTable.data, 
alteredTable.schema)
+    tableWithResetIds.setVersionAndValidatedVersionFrom(alteredTable)
     tables.put(ident, tableWithResetIds)
     tableWithResetIds
   }
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index ce926e42ebad..1a31e5f8ac1a 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,21 +21,23 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.{DSv2CacheTableReadTests, 
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
InMemoryTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.{DSv2CacheTableReadTests, 
DSv2IncrementallyConstructedQueryTests, DSv2RepeatedTableAccessTests, 
DSv2TempViewWithStoredPlanTests}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
InMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, TableCatalog}
 
 /**
  * Connect-mode counterpart of 
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
  *
  * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]), repeated 
table access tests
- * ([[DSv2RepeatedTableAccessTests]]), and CACHE TABLE read tests 
([[DSv2CacheTableReadTests]])
- * under Spark Connect. All test logic lives in the shared traits; this class 
only provides the
- * Connect-specific session, catalog access, and result comparison.
+ * ([[DSv2RepeatedTableAccessTests]]), incrementally constructed query tests
+ * ([[DSv2IncrementallyConstructedQueryTests]]), and CACHE TABLE read tests
+ * ([[DSv2CacheTableReadTests]]) under Spark Connect. All test logic lives in 
the shared traits;
+ * this class only provides the Connect-specific session, catalog access, and 
result comparison.
  */
 class DataSourceV2DataFrameConnectSuite
     extends SparkConnectServerTest
     with DSv2TempViewWithStoredPlanTests
     with DSv2RepeatedTableAccessTests
+    with DSv2IncrementallyConstructedQueryTests
     with DSv2CacheTableReadTests {
 
   override def sparkConf: SparkConf = super.sparkConf
@@ -43,8 +45,15 @@ class DataSourceV2DataFrameConnectSuite
     .set("spark.sql.catalog.testcat.copyOnLoad", "true")
     .set("spark.sql.catalog.cachingcat", 
classOf[CachingInMemoryTableCatalog].getName)
     .set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
+    .set("spark.sql.catalog.nullidcat", 
classOf[NullTableIdInMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.nullidcat.copyOnLoad", "true")
+    .set(
+      "spark.sql.catalog.nullbothidscat",
+      classOf[NullTableIdAndNullColumnIdInMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.nullbothidscat.copyOnLoad", "true")
 
   override protected def testPrefix: String = "[connect] "
+  override protected def isConnect: Boolean = true
 
   override protected def withTestSession(fn: SparkSession => Unit): Unit =
     withSession(fn)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 4591dca7abaf..0b2a50534447 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.{BufferedRows, 
CatalogV2Util, Iden
  *
  * Concrete suites override the abstract methods and mix in a test trait such 
as
  * [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedTableAccessTests]],
- * or [[DSv2CacheTableReadTests]].
+ * [[DSv2IncrementallyConstructedQueryTests]], or [[DSv2CacheTableReadTests]].
  */
 trait DSv2ExternalMutationTestBase extends QueryTest {
 
@@ -51,6 +51,9 @@ trait DSv2ExternalMutationTestBase extends QueryTest {
   /** Prefix for test names, e.g. "" or "[connect] ". */
   protected def testPrefix: String
 
+  /** Whether this suite runs under Spark Connect. */
+  protected def isConnect: Boolean
+
   /** Execute a test body with a session. */
   protected def withTestSession(fn: SparkSession => Unit): Unit
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
new file mode 100644
index 000000000000..1dbaad18e3e7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog, 
TableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Tests for incrementally constructed queries where df1 and df2 are analyzed 
at different
+ * times, then joined. The refresh phase in QueryExecution must align table 
versions across
+ * all references.
+ *
+ * Classic and Connect modes produce different results in some scenarios 
because in Connect
+ * mode, resolution is deferred until execution, so both sides of a join 
always see the
+ * latest table state.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect 
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode 
`.collect()` on
+ * eager statements (DDL, INSERT) is a no-op, so this is harmless.
+ */
+trait DSv2IncrementallyConstructedQueryTests extends 
DSv2ExternalMutationTestBase {
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 1: join after insert refreshes both sides to latest version.
+  // Both classic and Connect see the inserted data.
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join refreshes both sides after external 
insert" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        val df2 = session.table(testTable)
+
+        checkRows(
+          df1.join(df2, df1("id") === df2("id")),
+          Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join refreshes both sides after 
same-session insert" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+
+        val df2 = session.table(testTable)
+
+        checkRows(
+          df1.join(df2, df1("id") === df2("id")),
+          Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 2: join after ADD COLUMN.
+  // Classic: df1 keeps its original 2-column schema.
+  // Connect: re-resolves df1 with the new 3-column schema.
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        catalog.alterTable(
+          testIdent, TableChange.addColumn(Array("new_column"), IntegerType, 
true))
+        externalAppend(
+          catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
+
+        val df2 = session.table(testTable)
+        val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+        if (isConnect) {
+          // Connect re-resolves df1 with the new 3-column schema (id, salary, 
new_column).
+          assert(selfJoin.columns.length == 6,
+            s"Expected 6 columns (3 + 3) but got: 
${selfJoin.columns.mkString(", ")}")
+          checkRows(selfJoin,
+            Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+        } else {
+          // Classic: df1 keeps its original 2-column schema (id, salary).
+          assert(selfJoin.columns.length == 5,
+            s"Expected 5 columns (2 + 3) but got: 
${selfJoin.columns.mkString(", ")}")
+          checkRows(selfJoin,
+            Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+        }
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column 
INT").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+
+        val df2 = session.table(testTable)
+        val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+        if (isConnect) {
+          // Connect re-resolves df1 with the new 3-column schema (id, salary, 
new_column).
+          assert(selfJoin.columns.length == 6,
+            s"Expected 6 columns (3 + 3) but got: 
${selfJoin.columns.mkString(", ")}")
+          checkRows(selfJoin,
+            Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+        } else {
+          // Classic: df1 keeps its original 2-column schema (id, salary).
+          assert(selfJoin.columns.length == 5,
+            s"Expected 5 columns (2 + 3) but got: 
${selfJoin.columns.mkString(", ")}")
+          checkRows(selfJoin,
+            Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+        }
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 3: join after DROP COLUMN.
+  // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH.
+  // Connect: re-resolves df1 without the dropped column, join succeeds.
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        catalog.alterTable(
+          testIdent, TableChange.deleteColumn(Array("salary"), false))
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2))
+
+        val df2 = session.table(testTable)
+
+        if (isConnect) {
+          // Connect re-resolves df1 without the dropped column.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(1, 1), Row(2, 2)))
+        } else {
+          // Classic: df1 references the dropped column.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+            matchPVals = true,
+            parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+        }
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join after same-session DROP COLUMN" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (2)").collect()
+
+        val df2 = session.table(testTable)
+
+        if (isConnect) {
+          // Connect re-resolves df1 without the dropped column.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(1, 1), Row(2, 2)))
+        } else {
+          // Classic: df1 references the dropped column.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+            matchPVals = true,
+            parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+        }
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 4: external drop and recreate table.
+  // 4a: table ID detects it, TABLE_ID_MISMATCH in classic, succeeds in Connect
+  // 4b: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in 
Connect
+  // 4c: no IDs, goes undetected, join succeeds (both modes)
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join after external table drop and 
recreate" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val originTableId = catalog.loadTable(testIdent).id
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        val df2 = session.table(testTable)
+        val newTableId = catalog.loadTable(testIdent).id
+        assert(originTableId != newTableId)
+
+        if (isConnect) {
+          // Connect re-resolves both sides to the recreated table.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(2, 200, 2, 200)))
+        } else {
+          // Classic: table ID changed.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
+            matchPVals = true,
+            parameters = Map(
+              "tableName" -> ".*",
+              "capturedTableId" -> ".*",
+              "currentTableId" -> ".*"))
+        }
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join after external drop/recreate" +
+      " (table without table ID support, but with column ID support)") {
+    val nullIdT = "nullidcat.ns1.ns2.tbl"
+    withTestSession { session =>
+      withTestTableAndViews(session, nullIdT) {
+        session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect()
+
+        val df1 = session.table(nullIdT)
+        val catalog = getTableCatalog[TableCatalog](session, "nullidcat")
+        assert(catalog.loadTable(testIdent).id == null,
+          "NullTableIdInMemoryTableCatalog should produce null table IDs")
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        val df2 = session.table(nullIdT)
+
+        if (isConnect) {
+          // Connect re-resolves both sides to the recreated table.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(2, 200, 2, 200)))
+        } else {
+          // Classic: column IDs changed.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+            matchPVals = true,
+            parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+        }
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join does not detect external table drop 
and recreate" +
+      " (table without table ID support and without column ID support)") {
+    val nullBothT = "nullbothidscat.ns1.ns2.tbl"
+    withTestSession { session =>
+      withTestTableAndViews(session, nullBothT) {
+        session.sql(s"CREATE TABLE $nullBothT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $nullBothT VALUES (1, 100)").collect()
+
+        val df1 = session.table(nullBothT)
+        val catalog = getTableCatalog[TableCatalog](
+          session, "nullbothidscat")
+        assert(catalog.loadTable(testIdent).id == null,
+          "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null 
table IDs")
+        assert(catalog.loadTable(testIdent).columns().forall(_.id() == null),
+          "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null 
column IDs")
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        val df2 = session.table(nullBothT)
+
+        if (isConnect) {
+          // Connect re-resolves both sides to the recreated table, so the join
+          // sees the row appended after recreate.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(2, 200, 2, 200)))
+        } else {
+          // Classic: neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires, 
so the
+          // drop and recreate goes undetected. df1 keeps its pre-drop snapshot
+          // (1, 100) while df2 reads the recreated table (2, 200), so the 
join finds
+          // no matching ids and returns no rows.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq.empty)
+        }
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 5: external drop+re-add column.
+  // 5a: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in 
Connect
+  // 5b: no IDs, goes undetected, join succeeds (both modes)
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join after external drop+re-add column" +
+      " (table without table ID support, but with column ID support)") {
+    val nullIdT = "nullidcat.ns1.ns2.tbl"
+    withTestSession { session =>
+      withTestTableAndViews(session, nullIdT) {
+        session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect()
+
+        val df1 = session.table(nullIdT)
+
+        val catalog = getTableCatalog[TableCatalog](session, "nullidcat")
+        catalog.alterTable(
+          testIdent, TableChange.deleteColumn(Array("salary"), false))
+        catalog.alterTable(
+          testIdent, TableChange.addColumn(Array("salary"), IntegerType, true))
+
+        val df2 = session.table(nullIdT)
+
+        if (isConnect) {
+          // Connect re-resolves both sides with the new column ID.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(1, null, 1, null)))
+        } else {
+          // Classic: column ID changed.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+            matchPVals = true,
+            parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+        }
+      }
+    }
+  }
+
+  test(s"${testPrefix}SPARK-54157: join does not detect external drop+re-add 
column" +
+      " (table without table ID support and without column ID support)") {
+    val nullBothT = "nullbothidscat.ns1.ns2.tbl"
+    withTestSession { session =>
+      withTestTableAndViews(session, nullBothT) {
+        session.sql(s"CREATE TABLE $nullBothT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $nullBothT VALUES (1, 100)").collect()
+
+        val df1 = session.table(nullBothT)
+
+        val catalog = getTableCatalog[TableCatalog](
+          session, "nullbothidscat")
+        catalog.alterTable(
+          testIdent, TableChange.deleteColumn(Array("salary"), false))
+        catalog.alterTable(
+          testIdent, TableChange.addColumn(Array("salary"), IntegerType, true))
+
+        val df2 = session.table(nullBothT)
+
+        // Neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires.
+        // The change goes undetected and the join succeeds.
+        checkRows(
+          df1.join(df2, df1("id") === df2("id")),
+          Seq(Row(1, null, 1, null)))
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Scenario 6: external type change (drop INT column, add STRING column).
+  // The delete removes the old column ID and the add assigns a fresh one,
+  // so the column ID check fires (COLUMN_ID_MISMATCH) in classic before schema
+  // validation gets a chance to compare data types.
+  // Connect re-resolves both sides with the new column ID.
+  // 
---------------------------------------------------------------------------
+
+  test(s"${testPrefix}SPARK-54157: join after external drop+re-add 
different-type column" +
+      " (table with both table and column ID support)") {
+    withTestSession { session =>
+      withTestTableAndViews(session, testTable) {
+        session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+        val df1 = session.table(testTable)
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        catalog.alterTable(
+          testIdent, TableChange.deleteColumn(Array("salary"), false))
+        catalog.alterTable(
+          testIdent, TableChange.addColumn(Array("salary"), StringType, true))
+        externalAppend(catalog = catalog, ident = testIdent,
+          row = InternalRow(2, UTF8String.fromString("high")))
+
+        val df2 = session.table(testTable)
+
+        if (isConnect) {
+          // Connect re-resolves both sides with the new column type.
+          checkRows(
+            df1.join(df2, df1("id") === df2("id")),
+            Seq(Row(1, null, 1, null), Row(2, "high", 2, "high")))
+        } else {
+          // Classic: column ID changed.
+          checkError(
+            exception = intercept[AnalysisException] {
+              df1.join(df2, df1("id") === df2("id")).collect()
+            },
+            condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+            matchPVals = true,
+            parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+        }
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 372be8cfe308..f272f28a5f92 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Row, SaveMode, SparkS
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, 
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, 
NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, 
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
 import org.apache.spark.sql.connector.catalog.TableChange
@@ -49,6 +49,7 @@ class DataSourceV2DataFrameSuite
   extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests 
= false)
   with DSv2TempViewWithStoredPlanTests
   with DSv2RepeatedTableAccessTests
+  with DSv2IncrementallyConstructedQueryTests
   with DSv2CacheTableReadTests {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
   import testImplicits._
@@ -67,6 +68,9 @@ class DataSourceV2DataFrameSuite
     .set("spark.sql.catalog.nullcolidcat",
       classOf[NullColumnIdInMemoryTableCatalog].getName)
     .set("spark.sql.catalog.nullcolidcat.copyOnLoad", "true")
+    .set("spark.sql.catalog.nullbothidscat",
+      classOf[NullTableIdAndNullColumnIdInMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.nullbothidscat.copyOnLoad", "true")
     .set("spark.sql.catalog.resetidcat",
       classOf[TypeChangeResetsColIdTableCatalog].getName)
     .set("spark.sql.catalog.resetidcat.copyOnLoad", "true")
@@ -92,6 +96,7 @@ class DataSourceV2DataFrameSuite
 
   // DSv2ExternalMutationTestBase implementations for classic mode
   override protected def testPrefix: String = ""
+  override protected def isConnect: Boolean = false
 
   override protected def withTestSession(fn: SparkSession => Unit): Unit = 
fn(spark)
 
@@ -2630,7 +2635,8 @@ class DataSourceV2DataFrameSuite
 
   // Column ID tests: Null column ID connector
 
-  // When a connector does not support column IDs, validation is skipped.
+  // When a connector does not support column IDs, validation is skipped, but 
version
+  // tracking still detects the schema change and refreshes the table 
reference.
   test("connector with null column IDs: drop+re-add column not detected") {
     val t = "nullcolidcat.ns1.ns2.tbl"
     val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
@@ -2646,8 +2652,9 @@ class DataSourceV2DataFrameSuite
       sql(s"ALTER TABLE $t DROP COLUMN salary")
       sql(s"ALTER TABLE $t ADD COLUMN salary INT")
 
-      // succeeds because column ID validation is skipped when IDs are null
-      checkAnswer(df, Seq(Row(1, 100)))
+      // No column ID error because IDs are null. The table version changed, so
+      // [[V2TableRefreshUtil]] reloads it and the re-added salary column has 
null values.
+      checkAnswer(df, Seq(Row(1, null)))
     }
   }
 
@@ -2695,8 +2702,9 @@ class DataSourceV2DataFrameSuite
         .find(_.name() == "salary").get
       assert(newSalaryCol.id() == null, "salary should have a null ID after 
re-add")
 
-      // succeeds because current column ID is null, so validation is skipped
-      checkAnswer(df, Seq(Row(1, 100)))
+      // No column ID error because current ID is null. The table is refreshed 
via
+      // version tracking, so the re-added salary column has null values.
+      checkAnswer(df, Seq(Row(1, null)))
     }
   }
 
@@ -2725,8 +2733,9 @@ class DataSourceV2DataFrameSuite
         .find(_.name() == "salary").get
       assert(newSalaryCol.id() != null, "salary should have a non-null ID 
after re-add")
 
-      // succeeds because original column ID is null, so validation is skipped
-      checkAnswer(df, Seq(Row(1, 100)))
+      // No column ID error because original ID is null. The table is 
refreshed via
+      // version tracking, so the re-added salary column has null values.
+      checkAnswer(df, Seq(Row(1, null)))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to