This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 805b086d53e9 [SPARK-56618][SQL][TEST] Add DSv2 join refresh tests for
incrementally constructed queries
805b086d53e9 is described below
commit 805b086d53e9c4b400e0204f0f2013cd340d00db
Author: Thang Long Vu <[email protected]>
AuthorDate: Sun May 31 14:04:58 2026 +0800
[SPARK-56618][SQL][TEST] Add DSv2 join refresh tests for incrementally
constructed queries
### What changes were proposed in this pull request?
Add tests for incrementally constructed queries to
`DataSourceV2DataFrameSuite` and `DataSourceV2DataFrameConnectSuite`, along
with new test catalogs.
When DataFrames are analyzed at different times and then joined
(self-join), the refresh phase in QueryExecution must align all table
references to the same version. These tests verify 6 core scenarios with
catalog ID support variants:
- **Scenario 1** (external + same-session insert): Both sides of the
self-join see the latest version after an INSERT between df1 and df2 creation.
- **Scenario 2** (external + same-session ADD COLUMN): In classic mode, df1
keeps its original 2-column schema while df2 has the new 3-column schema; both
see the latest data. In Connect mode, both sides re-resolve with the new
3-column schema.
- **Scenario 3** (external + same-session DROP COLUMN): In classic mode,
removing a column that df1 references throws `COLUMNS_MISMATCH`. In Connect
mode, both sides re-resolve without the dropped column and the join succeeds.
- **Scenario 4a** (external drop/recreate, table with both table and column
ID support): In classic mode, drop+recreate produces a new table ID; join
throws `TABLE_ID_MISMATCH`. In Connect mode, both sides re-resolve to the
recreated table.
- **Scenario 4b** (external drop/recreate, table without table ID support,
but with column ID support): In classic mode, column IDs detect the recreate;
join throws `COLUMN_ID_MISMATCH`. In Connect mode, both sides re-resolve to the
recreated table.
- **Scenario 4c** (external drop/recreate, table without table ID support
and without column ID support): Neither ID check fires, so the drop/recreate
goes undetected. In classic mode, df1 keeps its pre-drop snapshot while df2
reads the recreated table, so the join finds no matching ids and returns no
rows. In Connect mode, both sides re-resolve to the recreated table and the
join sees the row appended after recreate.
- **Scenario 5a** (external drop+re-add column, table without table ID
support, but with column ID support): In classic mode, two separate alterTable
calls assign a fresh column ID; join throws `COLUMN_ID_MISMATCH`. In Connect
mode, both sides re-resolve with the new column ID.
- **Scenario 5b** (external drop+re-add column, table without table ID
support and without column ID support): Neither ID check fires; join succeeds
(change goes undetected). Same behavior in both modes.
- **Scenario 6** (external type change, table with both table and column ID
support): In classic mode, the delete removes the old column ID and the add
assigns a fresh one, so the column ID check fires; join throws
`COLUMN_ID_MISMATCH`. In Connect mode, both sides re-resolve with the new
column type.
#### Test architecture
All tests live in the shared `DSv2IncrementallyConstructedQueryTests` trait:
- **Scenarios 1, 5b** produce identical results in both classic and Connect
modes.
- **Scenarios 2, 3, 4a, 4b, 4c, 5a, 6** use `if (isConnect)` to assert the
correct expected result for each mode, since Connect defers resolution until
execution while classic preserves the schema captured at DataFrame creation
time.
- **`DataSourceV2DataFrameConnectSuite`** mixes in
`DSv2IncrementallyConstructedQueryTests` alongside the existing
`DSv2TempViewWithStoredPlanTests`, `DSv2RepeatedTableAccessTests`, and
`DSv2CacheTableReadTests` traits.
#### Behavioral corrections in existing tests
Three existing null-column-ID tests had expected results corrected from
`Row(1, 100)` to `Row(1, null)`. This reflects that version tracking now
properly refreshes the table after drop+re-add column, so the re-added salary
column (with a fresh column ID) returns null rather than the stale value. This
fix was caused by the `setVersionAndValidatedVersionFrom` addition described
below.
#### Shared test infrastructure changes
- **`NullTableIdAndNullColumnIdInMemoryTableCatalog`** (new): Test catalog
that returns `null` for both table ID and column ID, simulating catalogs
without ID support for scenarios 4c and 5b.
- **`InMemoryBaseTable`**: Added `setVersionAndValidatedVersionFrom`
method. Test catalogs that decorate tables (e.g.
`NullColumnIdInMemoryTableCatalog`,
`NullTableIdAndNullColumnIdInMemoryTableCatalog`) create new objects that start
at version 0. Without this call, `V2TableRefreshUtil` would see version 0 on
every load and never detect that the table has changed, breaking stale-table
refresh for incrementally constructed queries.
- **`InMemoryTable`**: Refactored `copy()` to use
`setVersionAndValidatedVersionFrom`, ensuring version-copy consistency.
- **`DSv2ExternalMutationTestBase`**: Added `isConnect` helper for
mode-specific assertions.
- **`TypeChangeResetsColIdTableCatalog`**,
**`NullColumnIdInMemoryTableCatalog`**, **`MixedColumnIdTableCatalog`**,
**`ComposedColumnIdTableCatalog`**, **`InMemoryRowLevelOperationTableCatalog`**
(modified): Added `setVersionAndValidatedVersionFrom` calls to propagate
version state consistently across all custom catalog table types.
### Why are the changes needed?
The existing tests covered scenarios 1-3 within larger multi-step tests but
did not cover:
- Table identity detection via table ID and column ID (scenarios 4a-4c)
- Column identity detection via column ID (scenarios 5a-5b)
- Type change detection via column ID (scenario 6)
- The case where neither table ID nor column ID is supported, showing the
limitation (scenarios 4c, 5b)
- The behavioral difference between classic and Connect modes for
incrementally constructed queries
These scenarios are important to verify the correctness of DSv2 table
refresh behavior for incrementally constructed queries.
### Does this PR introduce _any_ user-facing change?
No. This PR only adds tests and test-only catalogs.
### How was this patch tested?
New tests in `DSv2IncrementallyConstructedQueryTests` trait, run via
`DataSourceV2DataFrameSuite` (classic) and `DataSourceV2DataFrameConnectSuite`
(Connect).
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55463 from longvu-db/dsv2-pr3-joins.
Authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d76d7f5f6367518b4cb96b796a2c6ac83ec8303a)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalog/ComposedColumnIdTableCatalog.scala | 2 +
.../sql/connector/catalog/InMemoryBaseTable.scala | 19 +
.../InMemoryRowLevelOperationTableCatalog.scala | 2 +
.../sql/connector/catalog/InMemoryTable.scala | 5 +-
.../catalog/MixedColumnIdTableCatalog.scala | 2 +
.../catalog/NullColumnIdInMemoryTableCatalog.scala | 2 +
...bleIdAndNullColumnIdInMemoryTableCatalog.scala} | 61 ++-
.../TypeChangeResetsColIdTableCatalog.scala | 1 +
.../DataSourceV2DataFrameConnectSuite.scala | 19 +-
.../connector/DSv2ExternalMutationTestBase.scala | 5 +-
.../DSv2IncrementallyConstructedQueryTests.scala | 495 +++++++++++++++++++++
.../sql/connector/DataSourceV2DataFrameSuite.scala | 25 +-
12 files changed, 587 insertions(+), 51 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
index 0ef1a1970dea..64488a76db7f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala
@@ -96,6 +96,7 @@ class ComposedColumnIdTableCatalog extends
InMemoryTableCatalog {
table.constraints,
id = table.id)
composedTable.alterTableWithData(table.data, table.schema)
+ composedTable.setVersionAndValidatedVersionFrom(table)
tables.put(ident, composedTable)
composedTable
}
@@ -157,6 +158,7 @@ class ComposedColumnIdTableCatalog extends
InMemoryTableCatalog {
alteredTable.constraints,
id = alteredTable.id)
composedTable.alterTableWithData(alteredTable.data, alteredTable.schema)
+ composedTable.setVersionAndValidatedVersionFrom(alteredTable)
tables.put(ident, composedTable)
composedTable
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 06997662fd8b..6b09c0090de1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -97,6 +97,25 @@ abstract class InMemoryBaseTable(
tableVersion = version.toInt
}
+ /**
+ * Copies version and validated version from another table.
+ *
+ * Some test catalogs (e.g. [[NullColumnIdInMemoryTableCatalog]],
+ * [[NullTableIdAndNullColumnIdInMemoryTableCatalog]]) create a new table
object
+ * that overrides specific behavior (such as nulling out column IDs). The new
+ * object's version counter starts at 0. Without this call, the version
counter
+ * resets every time the catalog creates such a replacement table, breaking
the
+ * monotonic-version assumption that downstream consumers rely on (e.g.
+ * [[InMemoryTable]].copy, validated-version propagation, and the
join-refresh
+ * tests in [[DSv2IncrementallyConstructedQueryTests]]).
+ */
+ def setVersionAndValidatedVersionFrom(sourceTable: InMemoryBaseTable): Unit
= {
+ setVersion(sourceTable.version())
+ if (sourceTable.validatedVersion() != null) {
+ setValidatedVersion(sourceTable.validatedVersion())
+ }
+ }
+
def increaseVersion(): Unit = {
tableVersion += 1
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index 81f976dce510..285a2891bc93 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -82,6 +82,7 @@ class InMemoryRowLevelOperationTableCatalog
constraints = constraints,
tableId = table.id)
newTable.alterTableWithData(table.data, schema)
+ newTable.setVersionAndValidatedVersionFrom(table)
tables.put(ident, newTable)
@@ -123,6 +124,7 @@ class PartialSchemaEvolutionCatalog extends
InMemoryRowLevelOperationTableCatalo
properties = properties,
constraints = table.constraints)
newTable.alterTableWithData(table.data, table.schema)
+ newTable.setVersionAndValidatedVersionFrom(table)
tables.put(ident, newTable)
newTable
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 66db9c18fa98..c783bfbece14 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -158,10 +158,7 @@ class InMemoryTable(
copiedTable.commits ++= commits.map(_.copy())
- copiedTable.setVersion(version())
- if (validatedVersion() != null) {
- copiedTable.setValidatedVersion(validatedVersion())
- }
+ copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
index 70898c98afb8..f77cad3c077d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala
@@ -50,6 +50,7 @@ class MixedColumnIdTableCatalog extends InMemoryTableCatalog {
id = table.id,
nullIdNames = snapshot)
mixedTable.alterTableWithData(table.data, table.schema)
+ mixedTable.setVersionAndValidatedVersionFrom(table)
mixedTable
}
@@ -120,6 +121,7 @@ class MixedColumnIdInMemoryTable(
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
+ copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
index ee544f4da0e7..c26ce263c1f8 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
@@ -45,6 +45,7 @@ class NullColumnIdInMemoryTableCatalog extends
InMemoryTableCatalog {
constraints = table.constraints,
id = table.id)
nullColIdTable.alterTableWithData(table.data, table.schema)
+ nullColIdTable.setVersionAndValidatedVersionFrom(table)
nullColIdTable
}
@@ -100,6 +101,7 @@ class NullColumnIdInMemoryTable(
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
+ copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
similarity index 58%
copy from
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
copy to
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
index ee544f4da0e7..df7964f63b85 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala
@@ -22,84 +22,79 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.internal.connector.ColumnImpl
/**
- * An [[InMemoryTableCatalog]] that strips column IDs from all columns
- * ([[Column.id]] returns null). This simulates connectors that do not
- * support column identity tracking.
+ * An [[InMemoryTableCatalog]] that strips both table IDs ([[Table.id]]
+ * returns null) and column IDs ([[Column.id]] returns null). This simulates
+ * connectors that support neither table nor column identity tracking.
*
- * Tables are stored as [[NullColumnIdInMemoryTable]] instances that
- * override [[columns]] to strip IDs. Data is copied from the table
- * created by the parent [[InMemoryTableCatalog]].
- *
- * When column IDs are null, [[V2TableUtil.validateColumnIds]]
- * skips validation entirely, meaning drop/re-add of a column is NOT
- * detected via column IDs.
+ * When both IDs are null, neither the table identity check in
[[V2TableRefreshUtil]]
+ * nor [[V2TableUtil.validateColumnIds]] fires, so drop/recreate of a table or
+ * drop/re-add of a column goes undetected.
*/
-class NullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog {
+class NullTableIdAndNullColumnIdInMemoryTableCatalog extends
InMemoryTableCatalog {
- private def toNullColumnIdTable(table: InMemoryTable):
NullColumnIdInMemoryTable = {
- val nullColIdTable = new NullColumnIdInMemoryTable(
+ private def toNullIdsTable(
+ table: InMemoryTable): NullTableIdAndNullColumnIdInMemoryTable = {
+ val nullTable = new NullTableIdAndNullColumnIdInMemoryTable(
name = table.name,
columns = table.columns(),
partitioning = table.partitioning,
properties = table.properties,
- constraints = table.constraints,
- id = table.id)
- nullColIdTable.alterTableWithData(table.data, table.schema)
- nullColIdTable
+ constraints = table.constraints)
+ nullTable.alterTableWithData(table.data, table.schema)
+ nullTable.setVersionAndValidatedVersionFrom(table)
+ nullTable
}
override def createTable(
ident: Identifier,
info: TableInfo): Table = {
val table = super.createTable(ident, info).asInstanceOf[InMemoryTable]
- val nullColIdTable = toNullColumnIdTable(table)
- tables.put(ident, nullColIdTable)
- nullColIdTable
+ val nullTable = toNullIdsTable(table)
+ tables.put(ident, nullTable)
+ nullTable
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = super.alterTable(ident, changes:
_*).asInstanceOf[InMemoryTable]
- val nullColIdTable = toNullColumnIdTable(table)
- tables.put(ident, nullColIdTable)
- nullColIdTable
+ val nullTable = toNullIdsTable(table)
+ tables.put(ident, nullTable)
+ nullTable
}
}
/**
- * An [[InMemoryTable]] whose [[columns]] method always returns null
- * column IDs, simulating a connector that does not support column
- * identity tracking.
+ * An [[InMemoryTable]] with both null table ID and null column IDs,
+ * simulating a connector that supports neither identity tracking mechanism.
*/
-class NullColumnIdInMemoryTable(
+class NullTableIdAndNullColumnIdInMemoryTable(
name: String,
columns: Array[Column],
partitioning: Array[Transform],
properties: java.util.Map[String, String],
- constraints: Array[Constraint] = Array.empty,
- override val id: String = java.util.UUID.randomUUID().toString)
+ constraints: Array[Constraint] = Array.empty)
extends InMemoryTable(
name = name,
columns = columns,
partitioning = partitioning,
properties = properties,
constraints = constraints,
- id = id) {
+ id = null) {
override def columns(): Array[Column] = {
super.columns().map(_.asInstanceOf[ColumnImpl].copy(id = null))
}
override def copy(): Table = {
- val copiedTable = new NullColumnIdInMemoryTable(
+ val copiedTable = new NullTableIdAndNullColumnIdInMemoryTable(
name,
columns(),
partitioning,
properties,
- constraints,
- id)
+ constraints)
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
+ copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
index e1c54994c511..d68f2e62b136 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala
@@ -63,6 +63,7 @@ class TypeChangeResetsColIdTableCatalog extends
InMemoryTableCatalog {
alteredTable.constraints,
id = alteredTable.id)
tableWithResetIds.alterTableWithData(alteredTable.data,
alteredTable.schema)
+ tableWithResetIds.setVersionAndValidatedVersionFrom(alteredTable)
tables.put(ident, tableWithResetIds)
tableWithResetIds
}
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index ce926e42ebad..1a31e5f8ac1a 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,21 +21,23 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2RepeatedTableAccessTests, DSv2TempViewWithStoredPlanTests}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.{DSv2CacheTableReadTests,
DSv2IncrementallyConstructedQueryTests, DSv2RepeatedTableAccessTests,
DSv2TempViewWithStoredPlanTests}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog,
NullTableIdInMemoryTableCatalog, TableCatalog}
/**
* Connect-mode counterpart of
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
*
* Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]), repeated
table access tests
- * ([[DSv2RepeatedTableAccessTests]]), and CACHE TABLE read tests
([[DSv2CacheTableReadTests]])
- * under Spark Connect. All test logic lives in the shared traits; this class
only provides the
- * Connect-specific session, catalog access, and result comparison.
+ * ([[DSv2RepeatedTableAccessTests]]), incrementally constructed query tests
+ * ([[DSv2IncrementallyConstructedQueryTests]]), and CACHE TABLE read tests
+ * ([[DSv2CacheTableReadTests]]) under Spark Connect. All test logic lives in
the shared traits;
+ * this class only provides the Connect-specific session, catalog access, and
result comparison.
*/
class DataSourceV2DataFrameConnectSuite
extends SparkConnectServerTest
with DSv2TempViewWithStoredPlanTests
with DSv2RepeatedTableAccessTests
+ with DSv2IncrementallyConstructedQueryTests
with DSv2CacheTableReadTests {
override def sparkConf: SparkConf = super.sparkConf
@@ -43,8 +45,15 @@ class DataSourceV2DataFrameConnectSuite
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
.set("spark.sql.catalog.cachingcat",
classOf[CachingInMemoryTableCatalog].getName)
.set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
+ .set("spark.sql.catalog.nullidcat",
classOf[NullTableIdInMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.nullidcat.copyOnLoad", "true")
+ .set(
+ "spark.sql.catalog.nullbothidscat",
+ classOf[NullTableIdAndNullColumnIdInMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.nullbothidscat.copyOnLoad", "true")
override protected def testPrefix: String = "[connect] "
+ override protected def isConnect: Boolean = true
override protected def withTestSession(fn: SparkSession => Unit): Unit =
withSession(fn)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 4591dca7abaf..0b2a50534447 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.{BufferedRows,
CatalogV2Util, Iden
*
* Concrete suites override the abstract methods and mix in a test trait such
as
* [[DSv2TempViewWithStoredPlanTests]], [[DSv2RepeatedTableAccessTests]],
- * or [[DSv2CacheTableReadTests]].
+ * [[DSv2IncrementallyConstructedQueryTests]], or [[DSv2CacheTableReadTests]].
*/
trait DSv2ExternalMutationTestBase extends QueryTest {
@@ -51,6 +51,9 @@ trait DSv2ExternalMutationTestBase extends QueryTest {
/** Prefix for test names, e.g. "" or "[connect] ". */
protected def testPrefix: String
+ /** Whether this suite runs under Spark Connect. */
+ protected def isConnect: Boolean
+
/** Execute a test body with a session. */
protected def withTestSession(fn: SparkSession => Unit): Unit
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
new file mode 100644
index 000000000000..1dbaad18e3e7
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{Column, InMemoryTableCatalog,
TableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Tests for incrementally constructed queries where df1 and df2 are analyzed
at different
+ * times, then joined. The refresh phase in QueryExecution must align table
versions across
+ * all references.
+ *
+ * Classic and Connect modes produce different results in some scenarios
because in Connect
+ * mode, resolution is deferred until execution, so both sides of a join
always see the
+ * latest table state.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on
+ * eager statements (DDL, INSERT) is a no-op, so this is harmless.
+ */
+trait DSv2IncrementallyConstructedQueryTests extends
DSv2ExternalMutationTestBase {
+
+ //
---------------------------------------------------------------------------
+ // Scenario 1: join after insert refreshes both sides to latest version.
+ // Both classic and Connect see the inserted data.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after external
insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join refreshes both sides after
same-session insert" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+
+ val df2 = session.table(testTable)
+
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 2: join after ADD COLUMN.
+ // Classic: df1 keeps its original 2-column schema.
+ // Connect: re-resolves df1 with the new 3-column schema.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("new_column"), IntegerType,
true))
+ externalAppend(
+ catalog = catalog, ident = testIdent, row = InternalRow(2, 200, -1))
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after same-session ADD COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column
INT").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+
+ val df2 = session.table(testTable)
+ val selfJoin = df1.join(df2, df1("id") === df2("id"))
+
+ if (isConnect) {
+ // Connect re-resolves df1 with the new 3-column schema (id, salary,
new_column).
+ assert(selfJoin.columns.length == 6,
+ s"Expected 6 columns (3 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, null, 1, 100, null), Row(2, 200, -1, 2, 200, -1)))
+ } else {
+ // Classic: df1 keeps its original 2-column schema (id, salary).
+ assert(selfJoin.columns.length == 5,
+ s"Expected 5 columns (2 + 3) but got:
${selfJoin.columns.mkString(", ")}")
+ checkRows(selfJoin,
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 3: join after DROP COLUMN.
+ // Classic: df1 references the dropped column, fails with COLUMNS_MISMATCH.
+ // Connect: re-resolves df1 without the dropped column, join succeeds.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external DROP COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2))
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves df1 without the dropped column.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 1), Row(2, 2)))
+ } else {
+ // Classic: df1 references the dropped column.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after same-session DROP COLUMN" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2)").collect()
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves df1 without the dropped column.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 1), Row(2, 2)))
+ } else {
+ // Classic: df1 references the dropped column.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 4: external drop and recreate table.
+ // 4a: table ID detects it, TABLE_ID_MISMATCH in classic, succeeds in Connect
+ // 4b: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in
Connect
+ // 4c: no IDs, goes undetected, join succeeds (both modes)
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external table drop and
recreate" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val originTableId = catalog.loadTable(testIdent).id
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(testTable)
+ val newTableId = catalog.loadTable(testIdent).id
+ assert(originTableId != newTableId)
+
+ if (isConnect) {
+ // Connect re-resolves both sides to the recreated table.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(2, 200, 2, 200)))
+ } else {
+ // Classic: table ID changed.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map(
+ "tableName" -> ".*",
+ "capturedTableId" -> ".*",
+ "currentTableId" -> ".*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join after external drop/recreate" +
+ " (table without table ID support, but with column ID support)") {
+ val nullIdT = "nullidcat.ns1.ns2.tbl"
+ withTestSession { session =>
+ withTestTableAndViews(session, nullIdT) {
+ session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect()
+
+ val df1 = session.table(nullIdT)
+ val catalog = getTableCatalog[TableCatalog](session, "nullidcat")
+ assert(catalog.loadTable(testIdent).id == null,
+ "NullTableIdInMemoryTableCatalog should produce null table IDs")
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(nullIdT)
+
+ if (isConnect) {
+ // Connect re-resolves both sides to the recreated table.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(2, 200, 2, 200)))
+ } else {
+ // Classic: column IDs changed.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join does not detect external table drop
and recreate" +
+ " (table without table ID support and without column ID support)") {
+ val nullBothT = "nullbothidscat.ns1.ns2.tbl"
+ withTestSession { session =>
+ withTestTableAndViews(session, nullBothT) {
+ session.sql(s"CREATE TABLE $nullBothT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $nullBothT VALUES (1, 100)").collect()
+
+ val df1 = session.table(nullBothT)
+ val catalog = getTableCatalog[TableCatalog](
+ session, "nullbothidscat")
+ assert(catalog.loadTable(testIdent).id == null,
+ "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null
table IDs")
+ assert(catalog.loadTable(testIdent).columns().forall(_.id() == null),
+ "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null
column IDs")
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ val df2 = session.table(nullBothT)
+
+ if (isConnect) {
+ // Connect re-resolves both sides to the recreated table, so the join
+ // sees the row appended after recreate.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(2, 200, 2, 200)))
+ } else {
+ // Classic: neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires,
so the
+ // drop and recreate goes undetected. df1 keeps its pre-drop snapshot
+ // (1, 100) while df2 reads the recreated table (2, 200), so the
join finds
+ // no matching ids and returns no rows.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq.empty)
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 5: external drop+re-add column.
+ // 5a: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in
Connect
+ // 5b: no IDs, goes undetected, join succeeds (both modes)
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external drop+re-add column" +
+ " (table without table ID support, but with column ID support)") {
+ val nullIdT = "nullidcat.ns1.ns2.tbl"
+ withTestSession { session =>
+ withTestTableAndViews(session, nullIdT) {
+ session.sql(s"CREATE TABLE $nullIdT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $nullIdT VALUES (1, 100)").collect()
+
+ val df1 = session.table(nullIdT)
+
+ val catalog = getTableCatalog[TableCatalog](session, "nullidcat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("salary"), IntegerType, true))
+
+ val df2 = session.table(nullIdT)
+
+ if (isConnect) {
+ // Connect re-resolves both sides with the new column ID.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, null, 1, null)))
+ } else {
+ // Classic: column ID changed.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+ }
+ }
+ }
+ }
+
+ test(s"${testPrefix}SPARK-54157: join does not detect external drop+re-add
column" +
+ " (table without table ID support and without column ID support)") {
+ val nullBothT = "nullbothidscat.ns1.ns2.tbl"
+ withTestSession { session =>
+ withTestTableAndViews(session, nullBothT) {
+ session.sql(s"CREATE TABLE $nullBothT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $nullBothT VALUES (1, 100)").collect()
+
+ val df1 = session.table(nullBothT)
+
+ val catalog = getTableCatalog[TableCatalog](
+ session, "nullbothidscat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("salary"), IntegerType, true))
+
+ val df2 = session.table(nullBothT)
+
+ // Neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires.
+ // The change goes undetected and the join succeeds.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, null, 1, null)))
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Scenario 6: external type change (drop INT column, add STRING column).
+ // The delete removes the old column ID and the add assigns a fresh one,
+ // so the column ID check fires (COLUMN_ID_MISMATCH) in classic before schema
+ // validation gets a chance to compare data types.
+ // Connect re-resolves both sides with the new column ID.
+ //
---------------------------------------------------------------------------
+
+ test(s"${testPrefix}SPARK-54157: join after external drop+re-add
different-type column" +
+ " (table with both table and column ID support)") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+
+ val df1 = session.table(testTable)
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.alterTable(
+ testIdent, TableChange.deleteColumn(Array("salary"), false))
+ catalog.alterTable(
+ testIdent, TableChange.addColumn(Array("salary"), StringType, true))
+ externalAppend(catalog = catalog, ident = testIdent,
+ row = InternalRow(2, UTF8String.fromString("high")))
+
+ val df2 = session.table(testTable)
+
+ if (isConnect) {
+ // Connect re-resolves both sides with the new column type.
+ checkRows(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, null, 1, null), Row(2, "high", 2, "high")))
+ } else {
+ // Classic: column ID changed.
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> "(?s).*"))
+ }
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 372be8cfe308..f272f28a5f92 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
Row, SaveMode, SparkS
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue,
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo,
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue,
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog,
NullTableIdAndNullColumnIdInMemoryTableCatalog,
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable,
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
@@ -49,6 +49,7 @@ class DataSourceV2DataFrameSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests
= false)
with DSv2TempViewWithStoredPlanTests
with DSv2RepeatedTableAccessTests
+ with DSv2IncrementallyConstructedQueryTests
with DSv2CacheTableReadTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._
@@ -67,6 +68,9 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.nullcolidcat",
classOf[NullColumnIdInMemoryTableCatalog].getName)
.set("spark.sql.catalog.nullcolidcat.copyOnLoad", "true")
+ .set("spark.sql.catalog.nullbothidscat",
+ classOf[NullTableIdAndNullColumnIdInMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.nullbothidscat.copyOnLoad", "true")
.set("spark.sql.catalog.resetidcat",
classOf[TypeChangeResetsColIdTableCatalog].getName)
.set("spark.sql.catalog.resetidcat.copyOnLoad", "true")
@@ -92,6 +96,7 @@ class DataSourceV2DataFrameSuite
// DSv2ExternalMutationTestBase implementations for classic mode
override protected def testPrefix: String = ""
+ override protected def isConnect: Boolean = false
override protected def withTestSession(fn: SparkSession => Unit): Unit =
fn(spark)
@@ -2630,7 +2635,8 @@ class DataSourceV2DataFrameSuite
// Column ID tests: Null column ID connector
- // When a connector does not support column IDs, validation is skipped.
+ // When a connector does not support column IDs, validation is skipped, but
version
+ // tracking still detects the schema change and refreshes the table
reference.
test("connector with null column IDs: drop+re-add column not detected") {
val t = "nullcolidcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
@@ -2646,8 +2652,9 @@ class DataSourceV2DataFrameSuite
sql(s"ALTER TABLE $t DROP COLUMN salary")
sql(s"ALTER TABLE $t ADD COLUMN salary INT")
- // succeeds because column ID validation is skipped when IDs are null
- checkAnswer(df, Seq(Row(1, 100)))
+ // No column ID error because IDs are null. The table version changed, so
+ // [[V2TableRefreshUtil]] reloads it and the re-added salary column has
null values.
+ checkAnswer(df, Seq(Row(1, null)))
}
}
@@ -2695,8 +2702,9 @@ class DataSourceV2DataFrameSuite
.find(_.name() == "salary").get
assert(newSalaryCol.id() == null, "salary should have a null ID after
re-add")
- // succeeds because current column ID is null, so validation is skipped
- checkAnswer(df, Seq(Row(1, 100)))
+ // No column ID error because current ID is null. The table is refreshed
via
+ // version tracking, so the re-added salary column has null values.
+ checkAnswer(df, Seq(Row(1, null)))
}
}
@@ -2725,8 +2733,9 @@ class DataSourceV2DataFrameSuite
.find(_.name() == "salary").get
assert(newSalaryCol.id() != null, "salary should have a non-null ID
after re-add")
- // succeeds because original column ID is null, so validation is skipped
- checkAnswer(df, Seq(Row(1, 100)))
+ // No column ID error because original ID is null. The table is
refreshed via
+ // version tracking, so the re-added salary column has null values.
+ checkAnswer(df, Seq(Row(1, null)))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]