This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 22e5d886f8dd Revert "[SPARK-54595][SQL] Keep existing behavior of 
MERGE INTO without SCHEMA EVOLUTION clause"
22e5d886f8dd is described below

commit 22e5d886f8dd09bf21479aea6b9b7f91ca67730a
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Dec 5 19:21:43 2025 -0800

    Revert "[SPARK-54595][SQL] Keep existing behavior of MERGE INTO without 
SCHEMA EVOLUTION clause"
    
    This reverts commit 16a3f6c36b9dda0f009dd3e9e9a8fcb002fce9d8.
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   10 +-
 .../ResolveRowLevelCommandAssignments.scala        |    2 +-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 1381 +++++++++++---------
 .../execution/command/PlanResolutionSuite.scala    |   98 +-
 4 files changed, 783 insertions(+), 708 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 23d32dd87db1..08c31939f161 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1736,8 +1736,9 @@ class Analyzer(
                     Assignment(key, sourceAttr)
                   }
                 } else {
-                  targetTable.output.map { attr =>
-                    Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+                  sourceTable.output.flatMap { sourceAttr =>
+                    findAttrInTarget(sourceAttr.name).map(
+                      targetAttr => Assignment(targetAttr, sourceAttr))
                   }
                 }
                 UpdateAction(
@@ -1774,8 +1775,9 @@ class Analyzer(
                     Assignment(key, sourceAttr)
                   }
                 } else {
-                  targetTable.output.map { attr =>
-                    Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+                  sourceTable.output.flatMap { sourceAttr =>
+                    findAttrInTarget(sourceAttr.name).map(
+                      targetAttr => Assignment(targetAttr, sourceAttr))
                   }
                 }
                 InsertAction(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index bf1016ba8268..d1b8eab13191 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -53,7 +53,7 @@ object ResolveRowLevelCommandAssignments extends 
Rule[LogicalPlan] {
     case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
m.rewritable && !m.aligned &&
       !m.needSchemaEvolution =>
       validateStoreAssignmentPolicy()
-      val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes && 
m.withSchemaEvolution
+      val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes
       m.copy(
         targetTable = cleanAttrMetadata(m.targetTable),
         matchedActions = alignActions(m.targetTable.output, m.matchedActions,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 611daef36db0..e59b4435c408 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -2404,7 +2404,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
         sourceDF.createOrReplaceTempView("source")
 
         val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-        val mergeStmt = s"""MERGE $schemaEvolutionClause
+        sql(s"""MERGE $schemaEvolutionClause
                            |INTO $tableNameAsString t
                            |USING source s
                            |ON t.pk = s.pk
@@ -2412,9 +2412,8 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                            | UPDATE SET *
                            |WHEN NOT MATCHED THEN
                            | INSERT *
-                           |""".stripMargin
+                           |""".stripMargin)
         if (withSchemaEvolution && schemaEvolutionEnabled) {
-          sql(mergeStmt)
           checkAnswer(
             sql(s"SELECT * FROM $tableNameAsString"),
             Seq(
@@ -2425,12 +2424,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               Row(5, 250, "executive", true),
               Row(6, 350, null, false)))
         } else {
-          val e = intercept[org.apache.spark.sql.AnalysisException] {
-            sql(mergeStmt)
-          }
-          assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
-          assert(e.message.contains("A column, variable, or function parameter 
with name " +
-            "`dep` cannot be resolved"))
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr"),
+              Row(2, 200, "software"),
+              Row(3, 300, "hr"),
+              Row(4, 150, "marketing"),
+              Row(5, 250, "executive"),
+              Row(6, 350, null)))
         }
         sql(s"DROP TABLE $tableNameAsString")
       }
@@ -2461,7 +2463,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           sourceDF.createOrReplaceTempView("source")
 
           val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-          val mergeStmt = s"""MERGE $schemaEvolutionClause
+          sql(s"""MERGE $schemaEvolutionClause
                  |INTO $tableNameAsString t
                  |USING source s
                  |ON t.pk = s.pk
@@ -2469,9 +2471,8 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | UPDATE SET *
                  |WHEN NOT MATCHED THEN
                  | INSERT *
-                 |""".stripMargin
+                 |""".stripMargin)
           if (withSchemaEvolution && schemaEvolutionEnabled) {
-            sql(mergeStmt)
             checkAnswer(
               sql(s"SELECT * FROM $tableNameAsString"),
               Seq(
@@ -2482,12 +2483,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                 Row(5, 250, "executive", true),
                 Row(6, 350, "unknown", false)))
           } else {
-            val e = intercept[org.apache.spark.sql.AnalysisException] {
-              sql(mergeStmt)
-            }
-            assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
-            assert(e.getMessage.contains("A column, variable, or function 
parameter with name " +
-              "`dep` cannot be resolved"))
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(1, 100, "hr"),
+                Row(2, 200, "software"),
+                Row(3, 300, "hr"),
+                Row(4, 150, "marketing"),
+                Row(5, 250, "executive"),
+                Row(6, 350, "unknown")))
           }
           sql(s"DROP TABLE $tableNameAsString")
         }
@@ -3235,13 +3239,23 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  |WHEN NOT MATCHED THEN
                  | INSERT *
                  |""".stripMargin
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
-                  Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(
+                    Row(1, Row(10, Row(null, Map("c" -> "d"), false)), 
"sales"),
+                    Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
@@ -3321,18 +3335,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  |WHEN NOT MATCHED THEN
                  | INSERT *
                  |""".stripMargin
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
-                  Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(
+                    Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+                    Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `s`.`c2`.`a`"))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3508,18 +3534,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | INSERT *
                  |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), 
"hr"),
-                  Row(1, Map(Row(10, null, true) -> Row("y", null, false)), 
"sales"),
-                  Row(2, Map(Row(20, null, false) -> Row("z", null, true)), 
"engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), 
"hr"),
+                    Row(1, Map(Row(10, null, true) -> Row("y", null, false)), 
"sales"),
+                    Row(2, Map(Row(20, null, false) -> Row("z", null, true)), 
"engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `m`.`key`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `m`.`key`.`c2`"))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3574,18 +3612,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | INSERT (pk, m, dep) VALUES (src.pk, src.m, 'my_new_dep')
                  |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), 
"hr"),
-                  Row(1, Map(Row(10, null, true) -> Row("y", null, false)), 
"my_old_dep"),
-                  Row(2, Map(Row(20, null, false) -> Row("z", null, true)), 
"my_new_dep")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), 
"hr"),
+                    Row(1, Map(Row(10, null, true) -> Row("y", null, false)), 
"my_old_dep"),
+                    Row(2, Map(Row(20, null, false) -> Row("z", null, true)), 
"my_new_dep")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `m`.`key`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `m`.`key`.`c2`"))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3638,18 +3688,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | INSERT *
                  |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(Row(0, Array(Row(10, 10, null)), "hr"),
-                  Row(1, Array(Row(10, null, true)), "sales"),
-                  Row(2, Array(Row(20, null, false)), "engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+                    Row(1, Array(Row(10, null, true)), "sales"),
+                    Row(2, Array(Row(20, null, false)), "engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct 
`a`.`element`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `a`.`element`.`c2`"))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3702,18 +3764,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | INSERT (pk, a, dep) VALUES (src.pk, src.a, 'my_new_dep')
                  |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(Row(0, Array(Row(10, 10, null)), "hr"),
-                  Row(1, Array(Row(10, null, true)), "my_old_dep"),
-                  Row(2, Array(Row(20, null, false)), "my_new_dep")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+                    Row(1, Array(Row(10, null, true)), "my_old_dep"),
+                    Row(2, Array(Row(20, null, false)), "my_new_dep")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct 
`a`.`element`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `a`.`element`.`c2`"))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3977,8 +4051,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
             sql(mergeStmt)
           }
           assert(exception.errorClass.get == 
"UNRESOLVED_COLUMN.WITH_SUGGESTION")
-          assert(exception.message.contains(" A column, variable, or function 
parameter with name "
-            + "`bonus` cannot be resolved"))
         }
 
         sql(s"DROP TABLE $tableNameAsString")
@@ -4412,321 +4484,267 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("merge into with source missing fields in struct nested in array") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            // Target table has struct with 3 fields (c1, c2, c3) in array
-            createAndInitTable(
-              s"""pk INT NOT NULL,
-                 |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
-                 |dep STRING""".stripMargin,
-              """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], 
"dep": "sales" }
-                 |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ], 
"dep": "sales" }"""
-                .stripMargin)
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          // Target table has struct with 3 fields (c1, c2, c3) in array
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], "dep": 
"sales" }
+               |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ], 
"dep": "sales" }"""
+              .stripMargin)
 
-            // Source table has struct with only 2 fields (c1, c2) - missing c3
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("a", ArrayType(
-                StructType(Seq(
-                  StructField("c1", IntegerType),
-                  StructField("c2", StringType))))), // missing c3 field
-              StructField("dep", StringType)))
-            val data = Seq(
-              Row(1, Array(Row(10, "c")), "hr"),
-              Row(2, Array(Row(30, "e")), "engineering")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
+          // Source table has struct with only 2 fields (c1, c2) - missing c3
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("a", ArrayType(
+              StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StringType))))), // missing c3 field
+            StructField("dep", StringType)))
+          val data = Seq(
+            Row(1, Array(Row(10, "c")), "hr"),
+            Row(2, Array(Row(30, "e")), "engineering")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
 
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
-                 |USING source src
-                 |ON t.pk = src.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t
+               |USING source src
+               |ON t.pk = src.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
+          if (coerceNestedTypes) {
+            sql(mergeStmt)
+            // Missing field c3 should be filled with NULL
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Array(Row(1, "a", true)), "sales"),
+                Row(1, Array(Row(10, "c", null)), "hr"),
+                Row(2, Array(Row(30, "e", null)), "engineering")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
               sql(mergeStmt)
-              // Missing field c3 should be filled with NULL
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Array(Row(1, "a", true)), "sales"),
-                  Row(1, Array(Row(10, "c", null)), "hr"),
-                  Row(2, Array(Row(30, "e", null)), "engineering")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `a`.`element`.`c3`."))
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
 
   test("merge into with source missing fields in struct nested in map key") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            // Target table has struct with 2 fields in map key
-            val targetSchema =
-              StructType(Seq(
-                StructField("pk", IntegerType, nullable = false),
-                StructField("m", MapType(
-                  StructType(Seq(StructField("c1", IntegerType), 
StructField("c2", BooleanType))),
-                  StructType(Seq(StructField("c3", StringType))))),
-                StructField("dep", StringType)))
-            createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
-            val targetData = Seq(
-              Row(0, Map(Row(10, true) -> Row("x")), "hr"),
-              Row(1, Map(Row(20, false) -> Row("y")), "sales"))
-            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
-              .writeTo(tableNameAsString).append()
-
-            // Source table has struct with only 1 field (c1) in map key - 
missing c2
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType),
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          // Target table has struct with 2 fields in map key
+          val targetSchema =
+            StructType(Seq(
+              StructField("pk", IntegerType, nullable = false),
               StructField("m", MapType(
-                StructType(Seq(StructField("c1", IntegerType))), // missing c2
+                StructType(Seq(StructField("c1", IntegerType), 
StructField("c2", BooleanType))),
                 StructType(Seq(StructField("c3", StringType))))),
               StructField("dep", StringType)))
-            val sourceData = Seq(
-              Row(1, Map(Row(10) -> Row("z")), "sales"),
-              Row(2, Map(Row(20) -> Row("w")), "engineering")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(sourceData), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
+          createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
 
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
-                 |USING source src
-                 |ON t.pk = src.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
+          val targetData = Seq(
+            Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+            Row(1, Map(Row(20, false) -> Row("y")), "sales"))
+          spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
+            .writeTo(tableNameAsString).append()
 
-            if (coerceNestedTypes && withSchemaEvolution) {
+          // Source table has struct with only 1 field (c1) in map key - 
missing c2
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("m", MapType(
+              StructType(Seq(StructField("c1", IntegerType))), // missing c2
+              StructType(Seq(StructField("c3", StringType))))),
+            StructField("dep", StringType)))
+          val sourceData = Seq(
+            Row(1, Map(Row(10) -> Row("z")), "sales"),
+            Row(2, Map(Row(20) -> Row("w")), "engineering")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(sourceData), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t
+               |USING source src
+               |ON t.pk = src.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
+
+          if (coerceNestedTypes) {
+            sql(mergeStmt)
+            // Missing field c2 should be filled with NULL
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+                Row(1, Map(Row(10, null) -> Row("z")), "sales"),
+                Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
               sql(mergeStmt)
-              // Missing field c2 should be filled with NULL
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Map(Row(10, true) -> Row("x")), "hr"),
-                  Row(1, Map(Row(10, null) -> Row("z")), "sales"),
-                  Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `m`.`key`.`c2`."))
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
 
   test("merge into with source missing fields in struct nested in map value") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            // Target table has struct with 2 fields in map value
-            val targetSchema =
-              StructType(Seq(
-                StructField("pk", IntegerType, nullable = false),
-                StructField("m", MapType(
-                  StructType(Seq(StructField("c1", IntegerType))),
-                  StructType(Seq(StructField("c1", StringType), 
StructField("c2", BooleanType))))),
-                StructField("dep", StringType)))
-            createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
-            val targetData = Seq(
-              Row(0, Map(Row(10) -> Row("x", true)), "hr"),
-              Row(1, Map(Row(20) -> Row("y", false)), "sales"))
-            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
-              .writeTo(tableNameAsString).append()
-
-            // Source table has struct with only 1 field (c1) in map value - 
missing c2
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType),
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          // Target table has struct with 2 fields in map value
+          val targetSchema =
+            StructType(Seq(
+              StructField("pk", IntegerType, nullable = false),
               StructField("m", MapType(
                 StructType(Seq(StructField("c1", IntegerType))),
-                StructType(Seq(StructField("c1", StringType))))), // missing c2
+                StructType(Seq(StructField("c1", StringType), 
StructField("c2", BooleanType))))),
               StructField("dep", StringType)))
-            val sourceData = Seq(
-              Row(1, Map(Row(10) -> Row("z")), "sales"),
-              Row(2, Map(Row(20) -> Row("w")), "engineering")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(sourceData), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
-
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
-                 |USING source src
-                 |ON t.pk = src.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
-
-            if (coerceNestedTypes && withSchemaEvolution) {
-              sql(mergeStmt)
-              // Missing field c2 should be filled with NULL
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Map(Row(10) -> Row("x", true)), "hr"),
-                  Row(1, Map(Row(10) -> Row("z", null)), "sales"),
-                  Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
-            }
-          }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
-        }
-      }
-    }
-  }
+          createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
 
-  test("merge into with source missing fields in top-level struct") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            // Target table has struct with 3 fields at top level
-            createAndInitTable(
-              s"""pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
-                 |dep STRING""".stripMargin,
-              """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep": 
"sales"}""")
+          val targetData = Seq(
+            Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+            Row(1, Map(Row(20) -> Row("y", false)), "sales"))
+          spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
+            .writeTo(tableNameAsString).append()
 
-            // Source table has struct with only 2 fields (c1, c2) - missing c3
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StringType)))), // missing c3 field
-              StructField("dep", StringType)))
-            val data = Seq(
-              Row(1, Row(10, "b"), "hr"),
-              Row(2, Row(20, "c"), "engineering")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
+          // Source table has struct with only 1 field (c1) in map value - 
missing c2
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("m", MapType(
+              StructType(Seq(StructField("c1", IntegerType))),
+              StructType(Seq(StructField("c1", StringType))))), // missing c2
+            StructField("dep", StringType)))
+          val sourceData = Seq(
+            Row(1, Map(Row(10) -> Row("z")), "sales"),
+            Row(2, Map(Row(20) -> Row("w")), "engineering")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(sourceData), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
 
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
-                 |USING source src
-                 |ON t.pk = src.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t
+               |USING source src
+               |ON t.pk = src.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
+          if (coerceNestedTypes) {
+            sql(mergeStmt)
+            // Missing field c2 should be filled with NULL
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+                Row(1, Map(Row(10) -> Row("z", null)), "sales"),
+                Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
               sql(mergeStmt)
-              // Missing field c3 should be filled with NULL
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Row(1, "a", true), "sales"),
-                  Row(1, Row(10, "b", null), "hr"),
-                  Row(2, Row(20, "c", null), "engineering")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `m`.`value`.`c2`."))
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
 
-  test("merge into with source missing top-level column") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      withTempView("source") {
-        // Target table has 3 columns: pk, salary, dep
-        createAndInitTable(
-          s"""pk INT NOT NULL,
-             |salary INT,
-             |dep STRING""".stripMargin,
-          """{ "pk": 0, "salary": 100, "dep": "sales" }
-            |{ "pk": 1, "salary": 200, "dep": "hr" }"""
-            .stripMargin)
+  test("merge into with source missing fields in top-level struct") {
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          // Target table has struct with 3 fields at top level
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep": 
"sales"}""")
 
-        // Source table has only 2 columns: pk, dep (missing salary)
-        val sourceTableSchema = StructType(Seq(
-          StructField("pk", IntegerType, nullable = false),
-          StructField("dep", StringType)))
-        val data = Seq(
-          Row(1, "engineering"),
-          Row(2, "finance")
-        )
-        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-          .createOrReplaceTempView("source")
+          // Source table has struct with only 2 fields (c1, c2) - missing c3
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StringType)))), // missing c3 field
+            StructField("dep", StringType)))
+          val data = Seq(
+            Row(1, Row(10, "b"), "hr"),
+            Row(2, Row(20, "c"), "engineering")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
 
-        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-        val mergeStmt =
-          s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
-             |USING source src
-             |ON t.pk = src.pk
-             |WHEN MATCHED THEN
-             | UPDATE SET *
-             |WHEN NOT MATCHED THEN
-             | INSERT *
-             |""".stripMargin
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t
+               |USING source src
+               |ON t.pk = src.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
 
-        if (withSchemaEvolution) {
-          sql(mergeStmt)
-          checkAnswer(
-            sql(s"SELECT * FROM $tableNameAsString"),
-            Seq(
-              Row(0, 100, "sales"),
-              Row(1, 200, "engineering"),
-              Row(2, null, "finance")))
-        } else {
-          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+          if (coerceNestedTypes) {
             sql(mergeStmt)
+            // Missing field c3 should be filled with NULL
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Row(1, "a", true), "sales"),
+                Row(1, Row(10, "b", null), "hr"),
+                Row(2, Row(20, "c", null), "engineering")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
+              sql(mergeStmt)
+            }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `s`.`c3`."))
           }
-          assert(exception.errorClass.get ==
-            "UNRESOLVED_COLUMN.WITH_SUGGESTION")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
-      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
     }
   }
 
@@ -4866,69 +4884,70 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("merge with with null struct with missing nested field") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            // Target table has nested struct with fields c1 and c2
-            createAndInitTable(
-              s"""pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
-                 |dep STRING""".stripMargin,
-              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
-                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
-                .stripMargin)
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          // Target table has nested struct with fields c1 and c2
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
+              |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+              .stripMargin)
 
-            // Source table has null for the nested struct
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                  // missing field 'b'
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
+          // Source table has null for the nested struct
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType)
+                // missing field 'b'
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
 
-            val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
+          val data = Seq(
+            Row(1, null, "engineering"),
+            Row(2, null, "finance")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
 
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
-                 |ON t.pk = source.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t USING source
+               |ON t.pk = source.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
+          if (coerceNestedTypes) {
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Row(1, Row(10, "x")), "sales"),
+                Row(1, null, "engineering"),
+                Row(2, null, "finance")))
+          } else {
+            // Without coercion, the merge should fail due to missing field
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
               sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
-                  Row(2, null, "finance")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `s`.`c2`.`b`."))
           }
         }
-        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
     }
   }
 
@@ -4979,21 +4998,37 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                    | INSERT *
                    |""".stripMargin
 
-              if (coerceNestedTypes && withSchemaEvolution) {
-                // extra nested field is added
-                sql(mergeStmt)
-                checkAnswer(
-                  sql(s"SELECT * FROM $tableNameAsString"),
-                  Seq(
-                    Row(0, Row(1, Row(10, "x", null)), "sales"),
-                    Row(1, null, "engineering"),
-                    Row(2, null, "finance")))
+              if (coerceNestedTypes) {
+                if (withSchemaEvolution) {
+                  // extra nested field is added
+                  sql(mergeStmt)
+                  checkAnswer(
+                    sql(s"SELECT * FROM $tableNameAsString"),
+                    Seq(
+                      Row(0, Row(1, Row(10, "x", null)), "sales"),
+                      Row(1, null, "engineering"),
+                      Row(2, null, "finance")))
+                } else {
+                  // extra nested field is not added
+                  val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                    sql(mergeStmt)
+                  }
+                  assert(exception.errorClass.get ==
+                    "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                  assert(exception.getMessage.contains(
+                    "Cannot write incompatible data for the table ``: " +
+                      "Cannot write extra fields `c` to the struct `s`.`c2`"))
+                }
               } else {
+                // Without source struct coercion, the merge should fail
                 val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                   sql(mergeStmt)
                 }
                 assert(exception.errorClass.get ==
                   "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+                assert(exception.getMessage.contains(
+                  "Cannot write incompatible data for the table ``: " +
+                    "Cannot find data for the output column `s`.`c2`.`b`."))
               }
             }
           }
@@ -5062,83 +5097,82 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("merge with null struct using default value") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          withTempView("source") {
-            sql(
-              s"""CREATE TABLE $tableNameAsString (
-                 | pk INT NOT NULL,
-                 | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
-                 |   named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 
'default')),
-                 | dep STRING)
-                 |PARTITIONED BY (dep)
-                 |""".stripMargin)
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        withTempView("source") {
+          sql(
+            s"""CREATE TABLE $tableNameAsString (
+               | pk INT NOT NULL,
+               | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+               |   named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 
'default')),
+               | dep STRING)
+               |PARTITIONED BY (dep)
+               |""".stripMargin)
 
-            val initialSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType),
-                  StructField("b", StringType)
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
-            val initialData = Seq(
-              Row(0, Row(1, Row(10, "x")), "sales"),
-              Row(1, Row(2, Row(20, "y")), "hr")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(initialData), 
initialSchema)
-              .writeTo(tableNameAsString).append()
+          val initialSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType),
+                StructField("b", StringType)
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
+          val initialData = Seq(
+            Row(0, Row(1, Row(10, "x")), "sales"),
+            Row(1, Row(2, Row(20, "y")), "hr")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(initialData), 
initialSchema)
+            .writeTo(tableNameAsString).append()
 
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType)
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
 
-            val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source")
+          val data = Seq(
+            Row(1, null, "engineering"),
+            Row(2, null, "finance")
+          )
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
 
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt =
-              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
-                 |ON t.pk = source.pk
-                 |WHEN MATCHED THEN
-                 | UPDATE SET *
-                 |WHEN NOT MATCHED THEN
-                 | INSERT *
-                 |""".stripMargin
+          val mergeStmt =
+            s"""MERGE INTO $tableNameAsString t USING source
+               |ON t.pk = source.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
 
-            if (coerceNestedTypes && withSchemaEvolution) {
+          if (coerceNestedTypes) {
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Row(1, Row(10, "x")), "sales"),
+                Row(1, null, "engineering"),
+                Row(2, null, "finance")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
               sql(mergeStmt)
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
-                  Row(2, null, "finance")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                sql(mergeStmt)
-              }
-              assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
+            assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot find data for the output column `s`.`c2`.`b`"))
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
@@ -5209,8 +5243,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("merge into with source missing fields in nested struct") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { nestedTypeCoercion =>
+    Seq(true, false).foreach { nestedTypeCoercion =>
         withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key
             -> nestedTypeCoercion.toString) {
           withTempView("source") {
@@ -5242,8 +5275,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               .createOrReplaceTempView("source")
 
             // Missing field b should be filled with NULL
-            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
-            val mergeStmt = s"""MERGE $schemaEvolutionClause INTO 
$tableNameAsString t
+            val mergeStmt = s"""MERGE INTO $tableNameAsString t
                                |USING source src
                                |ON t.pk = src.pk
                                |WHEN MATCHED THEN
@@ -5252,7 +5284,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                                | INSERT *
                                |""".stripMargin
 
-            if (nestedTypeCoercion && withSchemaEvolution) {
+            if (nestedTypeCoercion) {
               sql(mergeStmt)
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
@@ -5260,17 +5292,16 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                   Row(1, Row(10, Row(20, null)), "sales"),
                   Row(2, Row(20, Row(30, null)), "engineering")))
             } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+              val exception = intercept[Exception] {
                 sql(mergeStmt)
               }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                """Cannot write incompatible data for the table 
``""".stripMargin))
             }
           }
           sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
       }
-    }
   }
 
   test("merge with named_struct missing non-nullable field") {
@@ -5895,18 +5926,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               .whenNotMatched()
               .insertAll()
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              mergeBuilder.withSchemaEvolution().merge()
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
-                  Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                mergeBuilder.withSchemaEvolution().merge()
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(
+                    Row(1, Row(10, Row(null, Map("c" -> "d"), false)), 
"sales"),
+                    Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  mergeBuilder.merge()
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 mergeBuilder.merge()
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `s`.`c2`.`a`"))
             }
 
             sql(s"DROP TABLE $tableNameAsString")
@@ -5995,18 +6038,30 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               .whenNotMatched()
               .insertAll()
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              mergeBuilder.withSchemaEvolution().merge()
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
-                  Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+            if (coerceNestedTypes) {
+              if (withSchemaEvolution) {
+                mergeBuilder.withSchemaEvolution().merge()
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(
+                    Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+                    Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  mergeBuilder.merge()
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                assert(exception.getMessage.contains(
+                  "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+              }
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 mergeBuilder.merge()
               }
               assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.getMessage.contains(
+                "Cannot find data for the output column `s`.`c2`.`a`"))
             }
 
             sql(s"DROP TABLE $tableNameAsString")
@@ -6077,190 +6132,198 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
   }
 
   test("merge into with source missing fields in top-level struct using 
dataframe API") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          val sourceTable = "cat.ns1.source_table"
-          withTable(sourceTable) {
-            // Target table has struct with 3 fields at top level
-            sql(
-              s"""CREATE TABLE $tableNameAsString (
-                 |pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
-                 |dep STRING)""".stripMargin)
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        val sourceTable = "cat.ns1.source_table"
+        withTable(sourceTable) {
+          // Target table has struct with 3 fields at top level
+          sql(
+            s"""CREATE TABLE $tableNameAsString (
+               |pk INT NOT NULL,
+               |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+               |dep STRING)""".stripMargin)
 
-            val targetData = Seq(
-              Row(0, Row(1, "a", true), "sales")
-            )
-            val targetSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StringType),
-                StructField("c3", BooleanType)
-              ))),
-              StructField("dep", StringType)
-            ))
-            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
-              .writeTo(tableNameAsString).append()
+          val targetData = Seq(
+            Row(0, Row(1, "a", true), "sales")
+          )
+          val targetSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StringType),
+              StructField("c3", BooleanType)
+            ))),
+            StructField("dep", StringType)
+          ))
+          spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
+            .writeTo(tableNameAsString).append()
 
-            // Create source table with struct having only 2 fields (c1, c2) - 
missing c3
-            val sourceIdent = Identifier.of(Array("ns1"), "source_table")
-            val columns = Array(
-              Column.create("pk", IntegerType, false),
-              Column.create("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StringType)))), // missing c3 field
-              Column.create("dep", StringType))
-            val tableInfo = new TableInfo.Builder()
-              .withColumns(columns)
-              .withProperties(extraTableProps)
-              .build()
-            catalog.createTable(sourceIdent, tableInfo)
+          // Create source table with struct having only 2 fields (c1, c2) - 
missing c3
+          val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+          val columns = Array(
+            Column.create("pk", IntegerType, false),
+            Column.create("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StringType)))), // missing c3 field
+            Column.create("dep", StringType))
+          val tableInfo = new TableInfo.Builder()
+            .withColumns(columns)
+            .withProperties(extraTableProps)
+            .build()
+          catalog.createTable(sourceIdent, tableInfo)
 
-            val data = Seq(
-              Row(1, Row(10, "b"), "hr"),
-              Row(2, Row(20, "c"), "engineering")
-            )
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StringType)))),
-              StructField("dep", StringType)))
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source_temp")
+          val data = Seq(
+            Row(1, Row(10, "b"), "hr"),
+            Row(2, Row(20, "c"), "engineering")
+          )
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StringType)))),
+            StructField("dep", StringType)))
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source_temp")
 
-            sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+          sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
 
-            val mergeBuilder = spark.table(sourceTable)
+          if (coerceNestedTypes) {
+            spark.table(sourceTable)
               .mergeInto(tableNameAsString, $"source_table.pk" === 
col(tableNameAsString + ".pk"))
               .whenMatched()
               .updateAll()
               .whenNotMatched()
               .insertAll()
+              .merge()
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              mergeBuilder.withSchemaEvolution().merge()
-
-              // Missing field c3 should be filled with NULL
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Row(1, "a", true), "sales"),
-                  Row(1, Row(10, "b", null), "hr"),
-                  Row(2, Row(20, "c", null), "engineering")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                mergeBuilder.merge()
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            // Missing field c3 should be filled with NULL
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Row(1, "a", true), "sales"),
+                Row(1, Row(10, "b", null), "hr"),
+                Row(2, Row(20, "c", null), "engineering")))
+          } else {
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
+              spark.table(sourceTable)
+                .mergeInto(tableNameAsString, $"source_table.pk" === 
col(tableNameAsString + ".pk"))
+                .whenMatched()
+                .updateAll()
+                .whenNotMatched()
+                .insertAll()
+                .merge()
             }
-
-            sql(s"DROP TABLE $tableNameAsString")
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `s`.`c3`."))
           }
+
+          sql(s"DROP TABLE $tableNameAsString")
         }
       }
     }
   }
 
   test("merge with null struct with missing nested field using dataframe API") 
{
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-          val sourceTable = "cat.ns1.source_table"
-          withTable(sourceTable) {
-            // Target table has nested struct with fields c1 and c2
-            sql(
-              s"""CREATE TABLE $tableNameAsString (
-                 |pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
-                 |dep STRING)""".stripMargin)
-
-            val targetData = Seq(
-              Row(0, Row(1, Row(10, "x")), "sales"),
-              Row(1, Row(2, Row(20, "y")), "hr")
-            )
-            val targetSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType),
-                  StructField("b", StringType)
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
-            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
-              .writeTo(tableNameAsString).append()
+    Seq(true, false).foreach { coerceNestedTypes =>
+      withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+        coerceNestedTypes.toString) {
+        val sourceTable = "cat.ns1.source_table"
+        withTable(sourceTable) {
+          // Target table has nested struct with fields c1 and c2
+          sql(
+            s"""CREATE TABLE $tableNameAsString (
+               |pk INT NOT NULL,
+               |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+               |dep STRING)""".stripMargin)
+
+          val targetData = Seq(
+            Row(0, Row(1, Row(10, "x")), "sales"),
+            Row(1, Row(2, Row(20, "y")), "hr")
+          )
+          val targetSchema = StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType),
+                StructField("b", StringType)
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
+          spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetSchema)
+            .writeTo(tableNameAsString).append()
 
-            // Create source table with missing nested field 'b'
-            val sourceIdent = Identifier.of(Array("ns1"), "source_table")
-            val columns = Array(
-              Column.create("pk", IntegerType, false),
-              Column.create("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                  // missing field 'b'
-                )))
-              ))),
-              Column.create("dep", StringType))
-            val tableInfo = new TableInfo.Builder()
-              .withColumns(columns)
-              .withProperties(extraTableProps)
-              .build()
-            catalog.createTable(sourceIdent, tableInfo)
+          // Create source table with missing nested field 'b'
+          val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+          val columns = Array(
+            Column.create("pk", IntegerType, false),
+            Column.create("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType)
+                // missing field 'b'
+              )))
+            ))),
+            Column.create("dep", StringType))
+          val tableInfo = new TableInfo.Builder()
+            .withColumns(columns)
+            .withProperties(extraTableProps)
+            .build()
+          catalog.createTable(sourceIdent, tableInfo)
 
-            // Source table has null for the nested struct
-            val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
-            )
-            val sourceTableSchema = StructType(Seq(
-              StructField("pk", IntegerType),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
-            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-              .createOrReplaceTempView("source_temp")
+          // Source table has null for the nested struct
+          val data = Seq(
+            Row(1, null, "engineering"),
+            Row(2, null, "finance")
+          )
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("s", StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType)
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source_temp")
 
-            sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
-            val mergeBuilder = spark.table(sourceTable)
-              .mergeInto(tableNameAsString,
-                $"source_table.pk" === col(tableNameAsString + ".pk"))
-              .whenMatched()
-              .updateAll()
-              .whenNotMatched()
-              .insertAll()
+          sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+          val mergeBuilder = spark.table(sourceTable)
+            .mergeInto(tableNameAsString,
+              $"source_table.pk" === col(tableNameAsString + ".pk"))
+            .whenMatched()
+            .updateAll()
+            .whenNotMatched()
+            .insertAll()
 
-            if (coerceNestedTypes && withSchemaEvolution) {
-              mergeBuilder.withSchemaEvolution().merge()
-              checkAnswer(
-                sql(s"SELECT * FROM $tableNameAsString"),
-                Seq(
-                  Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
-                  Row(2, null, "finance")))
-            } else {
-              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                mergeBuilder.merge()
-              }
-              assert(exception.errorClass.get ==
-                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+          if (coerceNestedTypes) {
+            mergeBuilder.merge()
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(0, Row(1, Row(10, "x")), "sales"),
+                Row(1, null, "engineering"),
+                Row(2, null, "finance")))
+          } else {
+            // Without coercion, the merge should fail due to missing field
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
+              mergeBuilder.merge()
             }
-
-            sql(s"DROP TABLE $tableNameAsString")
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot write incompatible data for the table ``: " +
+                "Cannot find data for the output column `s`.`c2`.`b`."))
           }
+
+          sql(s"DROP TABLE $tableNameAsString")
         }
       }
     }
@@ -6346,21 +6409,37 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                 .whenNotMatched()
                 .insertAll()
 
-              if (coerceNestedTypes && withSchemaEvolution) {
-                // extra nested field is added
-                mergeBuilder.withSchemaEvolution().merge()
-                checkAnswer(
-                  sql(s"SELECT * FROM $tableNameAsString"),
-                  Seq(
-                    Row(0, Row(1, Row(10, "x", null)), "sales"),
-                    Row(1, null, "engineering"),
-                    Row(2, null, "finance")))
+              if (coerceNestedTypes) {
+                if (withSchemaEvolution) {
+                  // extra nested field is added
+                  mergeBuilder.withSchemaEvolution().merge()
+                  checkAnswer(
+                    sql(s"SELECT * FROM $tableNameAsString"),
+                    Seq(
+                      Row(0, Row(1, Row(10, "x", null)), "sales"),
+                      Row(1, null, "engineering"),
+                      Row(2, null, "finance")))
+                } else {
+                  // extra nested field is not added
+                  val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                    mergeBuilder.merge()
+                  }
+                  assert(exception.errorClass.get ==
+                    "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+                  assert(exception.getMessage.contains(
+                    "Cannot write incompatible data for the table ``: " +
+                      "Cannot write extra fields `c` to the struct `s`.`c2`"))
+                }
               } else {
+                // Without source struct coercion, the merge should fail
                 val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                   mergeBuilder.merge()
                 }
                 assert(exception.errorClass.get ==
                   "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+                assert(exception.getMessage.contains(
+                  "Cannot write incompatible data for the table ``: " +
+                    "Cannot find data for the output column `s`.`c2`.`b`."))
               }
 
               sql(s"DROP TABLE $tableNameAsString")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8e5ee1644f9c..18042bf73adf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1633,10 +1633,10 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
 
       if (starInUpdate) {
         assert(updateAssigns.size == 2)
-        
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
-        
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
-        
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
-        
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
+        
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
+        
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
+        
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
       } else {
         assert(updateAssigns.size == 1)
         
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
@@ -1656,10 +1656,10 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
 
       if (starInInsert) {
         assert(insertAssigns.size == 2)
-        
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
-        
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
-        
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
-        
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
+        
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
+        
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
+        
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
       } else {
         assert(insertAssigns.size == 2)
         
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
@@ -1720,40 +1720,8 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
 
-        // star with schema evolution
-        val sqlStarSchemaEvolution =
-          s"""
-             |MERGE WITH SCHEMA EVOLUTION INTO $target AS target
-             |USING $source AS source
-             |ON target.i = source.i
-             |WHEN MATCHED AND (target.s='delete') THEN DELETE
-             |WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
-             |WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
-           """.stripMargin
-        parseAndResolve(sqlStarSchemaEvolution) match {
-          case MergeIntoTable(
-          SubqueryAlias(AliasIdentifier("target", Seq()), 
AsDataSourceV2Relation(target)),
-          SubqueryAlias(AliasIdentifier("source", Seq()), 
AsDataSourceV2Relation(source)),
-          mergeCondition,
-          Seq(DeleteAction(Some(EqualTo(dl: AttributeReference, 
StringLiteral("delete")))),
-          UpdateAction(Some(EqualTo(ul: AttributeReference,
-          StringLiteral("update"))), updateAssigns, _)),
-          Seq(InsertAction(Some(EqualTo(il: AttributeReference, 
StringLiteral("insert"))),
-          insertAssigns)),
-          Seq(),
-          withSchemaEvolution) =>
-            checkMergeConditionResolution(target, source, mergeCondition)
-            checkMatchedClausesResolution(target, source, Some(dl), Some(ul), 
updateAssigns,
-              starInUpdate = true)
-            checkNotMatchedClausesResolution(target, source, Some(il), 
insertAssigns,
-              starInInsert = true)
-            assert(withSchemaEvolution === true)
-
-          case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
-        }
-
         // star
-        val sqlStarWithoutSchemaEvolution =
+        val sql2 =
           s"""
              |MERGE INTO $target AS target
              |USING $source AS source
@@ -1762,7 +1730,7 @@ class PlanResolutionSuite extends SharedSparkSession with 
AnalysisTest {
              |WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
              |WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
            """.stripMargin
-        parseAndResolve(sqlStarWithoutSchemaEvolution) match {
+        parseAndResolve(sql2) match {
           case MergeIntoTable(
               SubqueryAlias(AliasIdentifier("target", Seq()), 
AsDataSourceV2Relation(target)),
               SubqueryAlias(AliasIdentifier("source", Seq()), 
AsDataSourceV2Relation(source)),
@@ -2368,11 +2336,24 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
          |USING testcat.tab2
          |ON 1 = 1
          |WHEN MATCHED THEN UPDATE SET *""".stripMargin
-    checkError(
-      exception = intercept[AnalysisException](parseAndResolve(sql2)),
-      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-      parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
-      context = ExpectedContext(fragment = sql2, start = 0, stop = 80))
+    val parsed2 = parseAndResolve(sql2)
+    parsed2 match {
+      case MergeIntoTable(
+          AsDataSourceV2Relation(target),
+          AsDataSourceV2Relation(source),
+          EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+          Seq(UpdateAction(None, updateAssigns, _)), // Matched actions
+          Seq(), // Not matched actions
+          Seq(), // Not matched by source actions
+          withSchemaEvolution) =>
+        val ti = target.output.find(_.name == "i").get
+        val si = source.output.find(_.name == "i").get
+        assert(updateAssigns.size == 1)
+        
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+        assert(withSchemaEvolution === false)
+      case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
+    }
 
     // INSERT * with incompatible schema between source and target tables.
     val sql3 =
@@ -2380,11 +2361,24 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
         |USING testcat.tab2
         |ON 1 = 1
         |WHEN NOT MATCHED THEN INSERT *""".stripMargin
-    checkError(
-      exception = intercept[AnalysisException](parseAndResolve(sql3)),
-      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-      parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
-      context = ExpectedContext(fragment = sql3, start = 0, stop = 80))
+    val parsed3 = parseAndResolve(sql3)
+    parsed3 match {
+      case MergeIntoTable(
+          AsDataSourceV2Relation(target),
+          AsDataSourceV2Relation(source),
+          EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+          Seq(), // Matched action
+          Seq(InsertAction(None, insertAssigns)), // Not matched actions
+          Seq(), // Not matched by source actions
+          withSchemaEvolution) =>
+        val ti = target.output.find(_.name == "i").get
+        val si = source.output.find(_.name == "i").get
+        assert(insertAssigns.size == 1)
+        
assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+        assert(withSchemaEvolution === false)
+      case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
+    }
 
     val sql4 =
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to