This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 073d26c7889c [SPARK-55828][SQL] Add DSV2 TableChange toString and fix 
missing error class for Merge Into Schema Evolution
073d26c7889c is described below

commit 073d26c7889c0c7719e04ce1a5a8cc03da1a37f6
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Mar 6 16:48:01 2026 +0800

    [SPARK-55828][SQL] Add DSV2 TableChange toString and fix missing error 
class for Merge Into Schema Evolution
    
    ### What changes were proposed in this pull request?
    1. Add TableChange toString methods
    2. Fix error class typo for MERGE INTO schema evolution
    
    ### Why are the changes needed?
    Fix a misleading error message (Error class not found) for some defensive 
handling in Merge Into Schema Evolution for certain DSV2 connector behavior
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add unit test to reproduce the issue
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, cursor
    
    Closes #54627 from szehon-ho/merge_unsupported_test.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  4 +-
 .../spark/sql/connector/catalog/TableChange.java   | 41 +++++++++++++++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  3 +-
 .../InMemoryRowLevelOperationTableCatalog.scala    | 28 ++++++++++
 .../MergeIntoSchemaEvolutionExtraSQLTests.scala    | 61 +++++++++++++++++++++-
 .../v2/jdbc/JDBCTableCatalogSuite.scala            |  2 +-
 6 files changed, 134 insertions(+), 5 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 2a7c98b27af8..8f82244e1987 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7664,9 +7664,9 @@
     },
     "sqlState" : "0A000"
   },
-  "UNSUPPORTED_TABLE_CHANGE_IN_AUTO_SCHEMA_EVOLUTION" : {
+  "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION" : {
     "message" : [
-      "The table changes <changes> are not supported by the catalog on table 
<tableName>."
+      "Operation could not apply the following schema changes to table 
<tableName> because the catalog did not support or only partially applied them: 
<changes>."
     ],
     "sqlState" : "42000"
   },
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 08754b18d6eb..e64fa5e64903 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -430,6 +430,10 @@ public interface TableChange {
     String[] fieldNames();
   }
 
+  static String fieldPath(String[] fieldNames) {
+    return String.join(".", fieldNames);
+  }
+
   /**
    * A TableChange to add a field. The implementation may need to back-fill 
all the existing data
    * to add this new column, or remember the column default value specified 
here and let the reader
@@ -507,6 +511,11 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "ADD COLUMN " + TableChange.fieldPath(fieldNames) + " " + 
dataType.sql();
+    }
   }
 
   /**
@@ -550,6 +559,11 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "RENAME COLUMN " + TableChange.fieldPath(fieldNames) + " TO " + 
newName;
+    }
   }
 
   /**
@@ -592,6 +606,11 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " TYPE " + 
newDataType.sql();
+    }
   }
 
   /**
@@ -634,6 +653,13 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return nullable
+          ? "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " DROP NOT 
NULL"
+          : "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " SET NOT 
NULL";
+    }
   }
 
   /**
@@ -676,6 +702,11 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " COMMENT";
+    }
   }
 
   /**
@@ -718,6 +749,11 @@ public interface TableChange {
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "ALTER COLUMN " + TableChange.fieldPath(fieldNames) + " " + 
position.toString();
+    }
   }
 
   /**
@@ -809,6 +845,11 @@ public interface TableChange {
     public int hashCode() {
       return Arrays.hashCode(fieldNames);
     }
+
+    @Override
+    public String toString() {
+      return "DROP COLUMN " + TableChange.fieldPath(fieldNames);
+    }
   }
 
   /** A TableChange to alter clustering columns for a table. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 494f0b07629b..e00dea31ba73 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3525,10 +3525,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
   def unsupportedTableChangesInAutoSchemaEvolutionError(
       changes: Array[TableChange], tableName: Seq[String]): Throwable = {
     val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
+    val changesDesc = changes.map(_.toString).mkString("; ")
     new AnalysisException(
       errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
       messageParameters = Map(
-        "changes" -> changes.mkString(","), "tableName" -> 
toSQLId(sanitizedTableName)))
+        "changes" -> changesDesc, "tableName" -> toSQLId(sanitizedTableName)))
   }
 
   def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index 08133668a6fb..bbb9041bab37 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -67,3 +67,31 @@ class InMemoryRowLevelOperationTableCatalog extends 
InMemoryTableCatalog {
     newTable
   }
 }
+
+/**
+ * A catalog that silently ignores schema changes in alterTable (e.g. 
AddColumn).
+ */
+class PartialSchemaEvolutionCatalog extends 
InMemoryRowLevelOperationTableCatalog {
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+    val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
+    // Only apply property changes; ignore all schema/column changes so that 
the table
+    // schema remains unchanged. This simulates a catalog that accepts 
alterTable but
+    // does not support some requested changes.
+    val propertyChanges = changes.filter {
+      case _: TableChange.SetProperty => true
+      case _: TableChange.RemoveProperty => true
+      case _ => false
+    }
+    val properties = CatalogV2Util.applyPropertiesChanges(table.properties, 
propertyChanges)
+    val newTable = new InMemoryRowLevelOperationTable(
+      name = table.name,
+      schema = table.schema,
+      partitioning = table.partitioning,
+      properties = properties,
+      constraints = table.constraints)
+    newTable.alterTableWithData(table.data, table.schema)
+    tables.put(ident, newTable)
+    newTable
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
index f0475390cd12..3bbeb0db1b74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
@@ -18,13 +18,18 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.SparkRuntimeException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
PartialSchemaEvolutionCatalog, TableInfo}
+import 
org.apache.spark.sql.connector.expressions.LogicalExpressions.{identity, 
reference}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 // Tests that cannot re-use helper functions as they have custom logic
 trait MergeIntoSchemaEvolutionExtraSQLTests extends RowLevelOperationSuiteBase 
{
 
+  import testImplicits._
+
   test("source missing struct field violating check constraints") {
     Seq(true, false).foreach { withSchemaEvolution =>
       Seq(true, false).foreach { coercionEnabled =>
@@ -89,5 +94,59 @@ trait MergeIntoSchemaEvolutionExtraSQLTests extends 
RowLevelOperationSuiteBase {
       }
     }
   }
+
+  test("error when catalog ignores schema changes in MERGE WITH SCHEMA 
EVOLUTION") {
+    spark.conf.set("spark.sql.catalog.cat", 
classOf[PartialSchemaEvolutionCatalog].getName)
+    spark.sessionState.catalogManager.reset()
+    withTable(tableNameAsString) {
+      withTempView("source") {
+        // Target: pk INT, salary INT, dep STRING
+        val targetData: DataFrame = Seq(
+          (1, 100, "hr"),
+          (2, 200, "software")
+        ).toDF("pk", "salary", "dep")
+        // Source: pk LONG (wider type), salary INT, dep STRING, active 
BOOLEAN (new column)
+        // Triggers UpdateColumnType(pk) and AddColumn(active) in schema 
evolution
+        val sourceData: DataFrame = Seq(
+          (1L, 150, "hr", true),
+          (3L, 350, "eng", false)
+        ).toDF("pk", "salary", "dep", "active")
+
+        val columns = CatalogV2Util.structTypeToV2Columns(targetData.schema)
+        val partitionCols = Seq("dep")
+        val transforms = Array[Transform](identity(reference(partitionCols)))
+        val tableInfo = new TableInfo.Builder()
+          .withColumns(columns)
+          .withPartitions(transforms)
+          .withProperties(extraTableProps)
+          .build()
+        catalog.createTable(ident, tableInfo)
+        targetData.writeTo(tableNameAsString).append()
+        sourceData.createOrReplaceTempView("source")
+
+        val ex = intercept[AnalysisException] {
+          sql(
+            s"""MERGE WITH SCHEMA EVOLUTION
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN UPDATE SET dep = s.dep, active = s.active
+               |WHEN NOT MATCHED THEN INSERT (pk, salary, dep, active)
+               |VALUES (s.pk, s.salary, s.dep, s.active)
+               |""".stripMargin)
+        }
+        assert(ex.getCondition === 
"UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
+          s"Expected error class 
UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION but got: " +
+            s"${ex.getCondition}. Message: ${ex.getMessage}")
+        assert(ex.getMessageParameters.get("tableName") != null,
+          s"Error message should mention table name: ${ex.getMessage}")
+
+        val msg = ex.getMessage
+        val expectedChanges = "ALTER COLUMN pk TYPE BIGINT; ADD COLUMN active 
BOOLEAN"
+        assert(msg.contains(expectedChanges),
+          s"Error message should contain exact changes '$expectedChanges': 
$msg")
+      }
+    }
+  }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 2c6c70e1bc51..ad2571f402b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -528,7 +528,7 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
         },
         condition = "UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG",
         parameters = Map(
-          "change" -> 
"org.apache.spark.sql.connector.catalog.TableChange\\$UpdateColumnComment.*",
+          "change" -> "ALTER COLUMN .* COMMENT",
           "tableName" -> "`test`.`alt_table`"
         ),
         matchPVals = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to