This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 073d26c7889c [SPARK-55828][SQL] Add DSV2 TableChange toString and fix
missing error class for Merge Into Schema Evolution
073d26c7889c is described below
commit 073d26c7889c0c7719e04ce1a5a8cc03da1a37f6
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Mar 6 16:48:01 2026 +0800
[SPARK-55828][SQL] Add DSV2 TableChange toString and fix missing error
class for Merge Into Schema Evolution
### What changes were proposed in this pull request?
1. Add TableChange toString methods
2. Fix error class typo for MERGE INTO schema evolution
### Why are the changes needed?
Fix a misleading error message (Error class not found) for some defensive
handling in Merge Into Schema Evolution for certain DSV2 connector behavior
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unit test to reproduce the issue
### Was this patch authored or co-authored using generative AI tooling?
Yes, cursor
Closes #54627 from szehon-ho/merge_unsupported_test.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 4 +-
.../spark/sql/connector/catalog/TableChange.java | 41 +++++++++++++++
.../spark/sql/errors/QueryCompilationErrors.scala | 3 +-
.../InMemoryRowLevelOperationTableCatalog.scala | 28 ++++++++++
.../MergeIntoSchemaEvolutionExtraSQLTests.scala | 61 +++++++++++++++++++++-
.../v2/jdbc/JDBCTableCatalogSuite.scala | 2 +-
6 files changed, 134 insertions(+), 5 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 2a7c98b27af8..8f82244e1987 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7664,9 +7664,9 @@
},
"sqlState" : "0A000"
},
- "UNSUPPORTED_TABLE_CHANGE_IN_AUTO_SCHEMA_EVOLUTION" : {
+ "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION" : {
"message" : [
- "The table changes <changes> are not supported by the catalog on table
<tableName>."
+ "Operation could not apply the following schema changes to table
<tableName> because the catalog did not support or only partially applied them:
<changes>."
],
"sqlState" : "42000"
},
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 08754b18d6eb..e64fa5e64903 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -430,6 +430,10 @@ public interface TableChange {
String[] fieldNames();
}
+ static String fieldPath(String[] fieldNames) {
+ return String.join(".", fieldNames);
+ }
+
/**
* A TableChange to add a field. The implementation may need to back-fill
all the existing data
* to add this new column, or remember the column default value specified
here and let the reader
@@ -507,6 +511,11 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return "ADD COLUMN " + TableChange.fieldPath(fieldNames) + " " +
dataType.sql();
+ }
}
/**
@@ -550,6 +559,11 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return "RENAME COLUMN " + TableChange.fieldPath(fieldNames) + " TO " +
newName;
+ }
}
/**
@@ -592,6 +606,11 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " TYPE " +
newDataType.sql();
+ }
}
/**
@@ -634,6 +653,13 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return nullable
+ ? "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " DROP NOT
NULL"
+ : "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " SET NOT
NULL";
+ }
}
/**
@@ -676,6 +702,11 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " COMMENT";
+ }
}
/**
@@ -718,6 +749,11 @@ public interface TableChange {
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
+
+ @Override
+ public String toString() {
+ return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " " +
position.toString();
+ }
}
/**
@@ -809,6 +845,11 @@ public interface TableChange {
public int hashCode() {
return Arrays.hashCode(fieldNames);
}
+
+ @Override
+ public String toString() {
+ return "DROP COLUMN " + TableChange.fieldPath(fieldNames);
+ }
}
/** A TableChange to alter clustering columns for a table. */
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 494f0b07629b..e00dea31ba73 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3525,10 +3525,11 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
def unsupportedTableChangesInAutoSchemaEvolutionError(
changes: Array[TableChange], tableName: Seq[String]): Throwable = {
val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
+ val changesDesc = changes.map(_.toString).mkString("; ")
new AnalysisException(
errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
messageParameters = Map(
- "changes" -> changes.mkString(","), "tableName" ->
toSQLId(sanitizedTableName)))
+ "changes" -> changesDesc, "tableName" -> toSQLId(sanitizedTableName)))
}
def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index 08133668a6fb..bbb9041bab37 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -67,3 +67,31 @@ class InMemoryRowLevelOperationTableCatalog extends
InMemoryTableCatalog {
newTable
}
}
+
+/**
+ * A catalog that silently ignores schema changes in alterTable (e.g.
AddColumn).
+ */
+class PartialSchemaEvolutionCatalog extends
InMemoryRowLevelOperationTableCatalog {
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
+ // Only apply property changes; ignore all schema/column changes so that
the table
+ // schema remains unchanged. This simulates a catalog that accepts
alterTable but
+ // does not support some requested changes.
+ val propertyChanges = changes.filter {
+ case _: TableChange.SetProperty => true
+ case _: TableChange.RemoveProperty => true
+ case _ => false
+ }
+ val properties = CatalogV2Util.applyPropertiesChanges(table.properties,
propertyChanges)
+ val newTable = new InMemoryRowLevelOperationTable(
+ name = table.name,
+ schema = table.schema,
+ partitioning = table.partitioning,
+ properties = properties,
+ constraints = table.constraints)
+ newTable.alterTableWithData(table.data, table.schema)
+ tables.put(ident, newTable)
+ newTable
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
index f0475390cd12..3bbeb0db1b74 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
@@ -18,13 +18,18 @@
package org.apache.spark.sql.connector
import org.apache.spark.SparkRuntimeException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
PartialSchemaEvolutionCatalog, TableInfo}
+import
org.apache.spark.sql.connector.expressions.LogicalExpressions.{identity,
reference}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
// Tests that cannot re-use helper functions as they have custom logic
trait MergeIntoSchemaEvolutionExtraSQLTests extends RowLevelOperationSuiteBase
{
+ import testImplicits._
+
test("source missing struct field violating check constraints") {
Seq(true, false).foreach { withSchemaEvolution =>
Seq(true, false).foreach { coercionEnabled =>
@@ -89,5 +94,59 @@ trait MergeIntoSchemaEvolutionExtraSQLTests extends
RowLevelOperationSuiteBase {
}
}
}
+
+ test("error when catalog ignores schema changes in MERGE WITH SCHEMA
EVOLUTION") {
+ spark.conf.set("spark.sql.catalog.cat",
classOf[PartialSchemaEvolutionCatalog].getName)
+ spark.sessionState.catalogManager.reset()
+ withTable(tableNameAsString) {
+ withTempView("source") {
+ // Target: pk INT, salary INT, dep STRING
+ val targetData: DataFrame = Seq(
+ (1, 100, "hr"),
+ (2, 200, "software")
+ ).toDF("pk", "salary", "dep")
+ // Source: pk LONG (wider type), salary INT, dep STRING, active
BOOLEAN (new column)
+ // Triggers UpdateColumnType(pk) and AddColumn(active) in schema
evolution
+ val sourceData: DataFrame = Seq(
+ (1L, 150, "hr", true),
+ (3L, 350, "eng", false)
+ ).toDF("pk", "salary", "dep", "active")
+
+ val columns = CatalogV2Util.structTypeToV2Columns(targetData.schema)
+ val partitionCols = Seq("dep")
+ val transforms = Array[Transform](identity(reference(partitionCols)))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withPartitions(transforms)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(ident, tableInfo)
+ targetData.writeTo(tableNameAsString).append()
+ sourceData.createOrReplaceTempView("source")
+
+ val ex = intercept[AnalysisException] {
+ sql(
+ s"""MERGE WITH SCHEMA EVOLUTION
+ |INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN UPDATE SET dep = s.dep, active = s.active
+ |WHEN NOT MATCHED THEN INSERT (pk, salary, dep, active)
+ |VALUES (s.pk, s.salary, s.dep, s.active)
+ |""".stripMargin)
+ }
+ assert(ex.getCondition ===
"UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
+ s"Expected error class
UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION but got: " +
+ s"${ex.getCondition}. Message: ${ex.getMessage}")
+ assert(ex.getMessageParameters.get("tableName") != null,
+ s"Error message should mention table name: ${ex.getMessage}")
+
+ val msg = ex.getMessage
+ val expectedChanges = "ALTER COLUMN pk TYPE BIGINT; ADD COLUMN active
BOOLEAN"
+ assert(msg.contains(expectedChanges),
+ s"Error message should contain exact changes '$expectedChanges':
$msg")
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 2c6c70e1bc51..ad2571f402b8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -528,7 +528,7 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
},
condition = "UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG",
parameters = Map(
- "change" ->
"org.apache.spark.sql.connector.catalog.TableChange\\$UpdateColumnComment.*",
+ "change" -> "ALTER COLUMN .* COMMENT",
"tableName" -> "`test`.`alt_table`"
),
matchPVals = true)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]