This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new aedd42be645e [SPARK-54621][SQL] Merge Into Update Set * preserve 
nested fields if …coerceNestedTypes is enabled
aedd42be645e is described below

commit aedd42be645e7e7492d240989d9325e750ddb843
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Dec 18 15:43:25 2025 +0800

    [SPARK-54621][SQL] Merge Into Update Set * preserve nested fields if 
…coerceNestedTypes is enabled
    
    ### What changes were proposed in this pull request?
    The 'struct coercion' feature for MERGE INTO (allowing it to pass if 
assigning a struct with less fields into a struct with more fields) is turned 
off in a flag in https://github.com/apache/spark/pull/53229 due to some 
ambiguity in behavior, but was not removed because the community wanted to try 
it.
    
    We want to still keep it under a flag, but we make a choice about which 
behavior to support when the flag is on.  In particular, we want UPDATE SET * 
to explode to all nested struct fields, so that in this scenario, existing 
nested struct fields are preserved.
    
    ### Why are the changes needed?
    
    aokolnychyi tested the feature and thinks that even if it is behind the 
experimental flag, we should take the stance for now that UPDATE SET * should 
explode to all nested fields vs top level columns.
    
    The rationale being:
    * its always safer to not override user values with null
    * Spark in general tries to treat nested fields like columns
    * there's already a way for the user to override the whole struct (and 
nullify non-existing fields) by specifying the struct explicitly, ie UPDATE SET 
struct = source.struct
    
    ### Does this PR introduce _any_ user-facing change?
    No, the whole feature is new and hidden behind an experimental flag.
    
    ### How was this patch tested?
    Existing tests (some output changes to not be null)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53360 from szehon-ho/SPARK-54621.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 92e5b3644d567392024464e4b3bd5f28fe69a550)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/AssignmentUtils.scala    |  163 ++-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 1111 +++++++++++++++++---
 2 files changed, 1091 insertions(+), 183 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 6c7b0626e81e..df4b0646ed42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -19,15 +19,17 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import 
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE,
 RECURSE}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Expression, GetStructField, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Assignment
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -72,7 +74,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport 
{
         assignments,
         addError = err => errors += err,
         colPath = Seq(attr.name),
-        coerceNestedTypes)
+        coerceNestedTypes,
+        fromStar)
     }
 
     if (errors.nonEmpty) {
@@ -156,7 +159,8 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
       assignments: Seq[Assignment],
       addError: String => Unit,
       colPath: Seq[String],
-      coerceNestedTypes: Boolean = false): Expression = {
+      coerceNestedTypes: Boolean = false,
+      updateStar: Boolean = false): Expression = {
 
     val (exactAssignments, otherAssignments) = assignments.partition { 
assignment =>
       assignment.key.semanticEquals(colExpr)
@@ -178,11 +182,25 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
     } else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) {
       TableOutputResolver.checkNullability(colExpr, col, conf, colPath)
     } else if (exactAssignments.nonEmpty) {
-      val value = exactAssignments.head.value
-      val coerceMode = if (coerceNestedTypes) RECURSE else NONE
-      val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, 
conf, addError,
-        colPath, coerceMode)
-      resolvedValue
+      if (updateStar && SQLConf.get.coerceMergeNestedTypes) {
+        val value = exactAssignments.head.value
+        col.dataType match {
+          case _: StructType =>
+            // Expand assignments to leaf fields (fixNullExpansion is applied 
inside)
+            applyNestedFieldAssignments(col, colExpr, value, addError, colPath,
+              coerceNestedTypes)
+          case _ =>
+            // For non-struct types, resolve directly
+            val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+            TableOutputResolver.resolveUpdate("", value, col, conf, addError, 
colPath,
+              coerceMode)
+        }
+      } else {
+        val value = exactAssignments.head.value
+        val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+        TableOutputResolver.resolveUpdate("", value, col, conf, addError,
+          colPath, coerceMode)
+      }
     } else {
       applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath, 
coerceNestedTypes)
     }
@@ -211,7 +229,67 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
       case otherType =>
         addError(
           "Updating nested fields is only supported for StructType but " +
-          s"'${colPath.quoted}' is of type $otherType")
+            s"'${colPath.quoted}' is of type $otherType")
+        colExpr
+    }
+  }
+
+  private def applyNestedFieldAssignments(
+      col: Attribute,
+      colExpr: Expression,
+      value: Expression,
+      addError: String => Unit,
+      colPath: Seq[String],
+      coerceNestedTypes: Boolean): Expression = {
+
+    col.dataType match {
+      case structType: StructType =>
+        val fieldAttrs = DataTypeUtils.toAttributes(structType)
+
+        val updatedFieldExprs = fieldAttrs.zipWithIndex.map { case (fieldAttr, 
ordinal) =>
+          val fieldPath = colPath :+ fieldAttr.name
+          val targetFieldExpr = GetStructField(colExpr, ordinal, 
Some(fieldAttr.name))
+
+          // Try to find a corresponding field in the source value by name
+          val sourceFieldValue: Expression = value.dataType match {
+            case valueStructType: StructType =>
+              valueStructType.fields.find(f => conf.resolver(f.name, 
fieldAttr.name)) match {
+                case Some(matchingField) =>
+                  // Found matching field in source, extract it
+                  val fieldIndex = 
valueStructType.fieldIndex(matchingField.name)
+                  GetStructField(value, fieldIndex, Some(matchingField.name))
+                case None =>
+                  // Field doesn't exist in source, use target's current value 
with null check
+                  TableOutputResolver.checkNullability(targetFieldExpr, 
fieldAttr, conf, fieldPath)
+              }
+            case _ =>
+              // Value is not a struct, cannot extract field
+              addError(s"Cannot assign non-struct value to struct field 
'${fieldPath.quoted}'")
+              Literal(null, fieldAttr.dataType)
+          }
+
+          // Recurse or resolve based on field type
+          fieldAttr.dataType match {
+            case _: StructType =>
+              // Field is a struct, recurse
+              applyNestedFieldAssignments(fieldAttr, targetFieldExpr,
+                sourceFieldValue, addError, fieldPath, coerceNestedTypes)
+            case _ =>
+              // Field is not a struct, resolve with TableOutputResolver
+              val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+              TableOutputResolver.resolveUpdate("", sourceFieldValue, 
fieldAttr, conf, addError,
+                fieldPath, coerceMode)
+          }
+        }
+        val namedStruct = toNamedStruct(structType, updatedFieldExprs)
+
+        // Prevent unnecessary null struct expansion
+        fixNullExpansion(colExpr, value, structType, namedStruct, colPath)
+
+      case otherType =>
+        addError(
+          "Updating nested fields is only supported for StructType but " +
+            s"'${colPath.quoted}' is of type $otherType")
         colExpr
     }
   }
@@ -223,6 +301,73 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
     CreateNamedStruct(namedStructExprs)
   }
 
+  /**
+   * Checks if target struct has extra fields compared to source struct, 
recursively.
+   */
+  private def hasExtraTargetFields(targetType: StructType, sourceType: 
DataType): Boolean = {
+    sourceType match {
+      case sourceStructType: StructType =>
+        targetType.fields.exists { targetField =>
+          sourceStructType.fields.find(f => conf.resolver(f.name, 
targetField.name)) match {
+            case Some(sourceField) =>
+              // Check nested structs recursively
+              (targetField.dataType, sourceField.dataType) match {
+                case (targetNested: StructType, sourceNested) =>
+                  hasExtraTargetFields(targetNested, sourceNested)
+                case _ => false
+              }
+            case None => true // target has extra field not in source
+          }
+        }
+      case _ =>
+        // Should be caught earlier
+        throw SparkException.internalError(
+          s"Source type must be StructType but found: $sourceType")
+    }
+  }
+
+  /**
+   * As UPDATE SET * assigns struct fields individually (preserving existing 
fields),
+   * this will lead to indiscriminate null expansion, ie, a struct is created 
where all
+   * fields are null.  Wraps a struct assignment with a condition to return 
null
+   * if both conditions are true:
+   *
+   * - source struct is null
+   * - target struct is null OR target struct is same as source struct
+   *
+   * If the condition is not true, we preserve the original structure.
+   * This includes cases where the source was a struct of nulls,
+   * or there were any extra target fields (including null ones),
+   * both cases retain the assignment to a struct of nulls.
+   *
+   * @param key the original assignment key (target struct) expression
+   * @param value the original assignment value (source struct) expression
+   * @param structType the target struct type
+   * @param structExpression the result create struct expression result to wrap
+   * @param colPath the column path for error reporting
+   * @return the wrapped expression with null checks
+   */
+  private def fixNullExpansion(
+      key: Expression,
+      value: Expression,
+      structType: StructType,
+      structExpression: Expression,
+      colPath: Seq[String]): Expression = {
+    if (key.nullable) {
+      val condition = if (hasExtraTargetFields(structType, value.dataType)) {
+        // extra target fields: return null iff source struct is null and 
target struct is null
+        And(IsNull(value), IsNull(key))
+      } else {
+        // schemas match: return null iff source struct is null
+        IsNull(value)
+      }
+
+      If(condition, Literal(null, structExpression.dataType), structExpression)
+    } else {
+      structExpression
+    }
+  }
+
   /**
    * Checks whether assignments are aligned and compatible with table columns.
    *
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 5d1173b5a1a5..ea19a916b7cc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -3240,9 +3240,8 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+                  Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), 
"sales"),
                   Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
-
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
@@ -4776,8 +4775,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     sql(s"DROP TABLE IF EXISTS $tableNameAsString")
   }
 
-
-  test("merge with null struct - update field") {
+  test("merge with struct of nulls") {
     withTempView("source") {
       createAndInitTable(
         s"""pk INT NOT NULL,
@@ -4797,9 +4795,10 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
         StructField("dep", StringType)
       ))
 
+      // Source has a struct with null field values (not a null struct)
       val data = Seq(
-        Row(1, null, "engineering"),
-        Row(2, null, "finance")
+        Row(1, Row(null, null), "engineering"),
+        Row(2, Row(null, null), "finance")
       )
       spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
         .createOrReplaceTempView("source")
@@ -4808,31 +4807,32 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
         s"""MERGE INTO $tableNameAsString t USING source
            |ON t.pk = source.pk
            |WHEN MATCHED THEN
-           | UPDATE SET s = source.s
+           | UPDATE SET *
            |WHEN NOT MATCHED THEN
            | INSERT *
            |""".stripMargin)
+      // Struct of null values should be preserved, not converted to null 
struct
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
         Seq(
           Row(0, Row(1, "a"), "sales"),
-          Row(1, null, "hr"),
-          Row(2, null, "finance")))
+          Row(1, Row(null, null), "engineering"),
+          Row(2, Row(null, null), "finance")))
     }
     sql(s"DROP TABLE IF EXISTS $tableNameAsString")
   }
 
-  test("merge with null struct into non-nullable struct column") {
+  test("merge with null struct into struct of nulls") {
     withTempView("source") {
       createAndInitTable(
         s"""pk INT NOT NULL,
-           |s STRUCT<c1: INT, c2: STRING> NOT NULL,
+           |s STRUCT<c1: INT, c2: STRING>,
            |dep STRING""".stripMargin,
         """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
-          |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+          |{ "pk": 1, "s": { "c1": null, "c2": null }, "dep": "hr" }"""
           .stripMargin)
 
-      // Source table has null for the struct column
+      // Source table matches target table schema
       val sourceTableSchema = StructType(Seq(
         StructField("pk", IntegerType),
         StructField("s", StructType(Seq(
@@ -4842,61 +4842,60 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
         StructField("dep", StringType)
       ))
 
+      // Source has a null struct (not a struct of nulls)
       val data = Seq(
-        Row(1, null, "engineering"),
-        Row(2, null, "finance")
+        Row(1, null, "engineering")
       )
       spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
         .createOrReplaceTempView("source")
 
-      // Should throw an exception when trying to insert/update null into NOT 
NULL column
-      val exception = intercept[Exception] {
-        sql(
-          s"""MERGE INTO $tableNameAsString t USING source
-             |ON t.pk = source.pk
-             |WHEN MATCHED THEN
-             | UPDATE SET *
-             |WHEN NOT MATCHED THEN
-             | INSERT *
-             |""".stripMargin)
-      }
-      assert(exception.getMessage.contains(
-        "NULL value appeared in non-nullable field"))
+      sql(
+        s"""MERGE INTO $tableNameAsString t USING source
+           |ON t.pk = source.pk
+           |WHEN MATCHED THEN
+           | UPDATE SET *
+           |WHEN NOT MATCHED THEN
+           | INSERT *
+           |""".stripMargin)
+      // Null struct should override struct of nulls
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(0, Row(1, "a"), "sales"),
+          Row(1, null, "engineering")))
     }
     sql(s"DROP TABLE IF EXISTS $tableNameAsString")
   }
 
-  test("merge with with null struct with missing nested field") {
+  test("merge with null struct into struct of nulls with extra target field") {
     Seq(true, false).foreach { withSchemaEvolution =>
       Seq(true, false).foreach { coerceNestedTypes =>
         withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
           coerceNestedTypes.toString) {
           withTempView("source") {
-            // Target table has nested struct with fields c1 and c2
+            // Target has struct with 3 fields, row 1 has all nulls including 
extra field c3
             createAndInitTable(
               s"""pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+                 |s STRUCT<c1: INT, c2: STRING, c3: INT>,
                  |dep STRING""".stripMargin,
-              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
-                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+              """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": 
"sales" }
+                |{ "pk": 1, "s": { "c1": null, "c2": null, "c3": null }, 
"dep": "hr" }"""
                 .stripMargin)
 
-            // Source table has null for the nested struct
+            // Source table has struct with 2 fields (missing c3)
             val sourceTableSchema = StructType(Seq(
               StructField("pk", IntegerType),
               StructField("s", StructType(Seq(
                 StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                  // missing field 'b'
-                )))
+                StructField("c2", StringType)
+                // missing field c3
               ))),
               StructField("dep", StringType)
             ))
 
+            // Source has a null struct (not a struct of nulls)
             val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
+              Row(1, null, "engineering")
             )
             spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
               .createOrReplaceTempView("source")
@@ -4913,12 +4912,12 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
 
             if (coerceNestedTypes && withSchemaEvolution) {
               sql(mergeStmt)
+              // Because target has extra field c3, we preserve struct of nulls
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
-                  Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
-                  Row(2, null, "finance")))
+                  Row(0, Row(1, "a", 10), "sales"),
+                  Row(1, Row(null, null, null), "engineering")))
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
@@ -4933,114 +4932,42 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
-  test("merge null struct with schema evolution - source with missing and 
extra nested fields") {
-    Seq(true, false).foreach { withSchemaEvolution =>
-      Seq(true, false).foreach { coerceNestedTypes =>
-        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
-          coerceNestedTypes.toString) {
-            withTempView("source") {
-              // Target table has nested struct with fields c1 and c2
-              createAndInitTable(
-                s"""pk INT NOT NULL,
-                   |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
-                   |dep STRING""".stripMargin,
-                """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
-                  |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
-                  .stripMargin)
-
-              // Source table has missing field 'b' and extra field 'c' in 
nested struct
-              val sourceTableSchema = StructType(Seq(
-                StructField("pk", IntegerType),
-                StructField("s", StructType(Seq(
-                  StructField("c1", IntegerType),
-                  StructField("c2", StructType(Seq(
-                    StructField("a", IntegerType),
-                    // missing field 'b'
-                    StructField("c", StringType) // extra field 'c'
-                  )))
-                ))),
-                StructField("dep", StringType)
-              ))
-
-              val data = Seq(
-                Row(1, null, "engineering"),
-                Row(2, null, "finance")
-              )
-              spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
-                .createOrReplaceTempView("source")
-
-              val schemaEvolutionClause = if (withSchemaEvolution) "WITH 
SCHEMA EVOLUTION" else ""
-              val mergeStmt =
-                s"""MERGE $schemaEvolutionClause
-                   |INTO $tableNameAsString t USING source
-                   |ON t.pk = source.pk
-                   |WHEN MATCHED THEN
-                   | UPDATE SET *
-                   |WHEN NOT MATCHED THEN
-                   | INSERT *
-                   |""".stripMargin
-
-              if (coerceNestedTypes && withSchemaEvolution) {
-                // extra nested field is added
-                sql(mergeStmt)
-                checkAnswer(
-                  sql(s"SELECT * FROM $tableNameAsString"),
-                  Seq(
-                    Row(0, Row(1, Row(10, "x", null)), "sales"),
-                    Row(1, null, "engineering"),
-                    Row(2, null, "finance")))
-              } else {
-                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
-                  sql(mergeStmt)
-                }
-                assert(exception.errorClass.get ==
-                  "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
-              }
-            }
-          }
-        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
-      }
-    }
-  }
-
-  test("merge null struct with non-nullable nested field - source with missing 
" +
-    "and extra nested fields") {
+  test("merge with struct of nulls with missing source field") {
     Seq(true, false).foreach { withSchemaEvolution =>
       Seq(true, false).foreach { coerceNestedTypes =>
         withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
           coerceNestedTypes.toString) {
           withTempView("source") {
+            // Target has struct with 3 fields
             createAndInitTable(
               s"""pk INT NOT NULL,
-                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
+                 |s STRUCT<c1: INT, c2: STRING, c3: INT>,
                  |dep STRING""".stripMargin,
-              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
-                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+              """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": 
"sales" }
+                |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": 20 }, "dep": "hr" 
}"""
                 .stripMargin)
 
+            // Source table has struct with 2 fields (missing field c3)
             val sourceTableSchema = StructType(Seq(
               StructField("pk", IntegerType),
               StructField("s", StructType(Seq(
                 StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType),
-                  StructField("c", StringType)
-                )))
+                StructField("c2", StringType)
+                // missing field c3
               ))),
               StructField("dep", StringType)
             ))
 
+            // Source has a struct with two null field values (not a null 
struct)
             val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
+              Row(1, Row(null, null), "engineering")
             )
             spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
               .createOrReplaceTempView("source")
 
             val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
             val mergeStmt =
-              s"""MERGE $schemaEvolutionClause
-                 |INTO $tableNameAsString t USING source
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
                  |ON t.pk = source.pk
                  |WHEN MATCHED THEN
                  | UPDATE SET *
@@ -5048,67 +4975,57 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                  | INSERT *
                  |""".stripMargin
 
-            val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            if (coerceNestedTypes && withSchemaEvolution) {
               sql(mergeStmt)
+              // Struct of null values should be preserved, not converted to 
null struct
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, "a", 10), "sales"),
+                  Row(1, Row(null, null, 20), "engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
-            assert(exception.errorClass.get ==
-              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
-            assert(exception.getMessage.contains(
-              "Cannot find data for the output column `s`.`c2`.`b`"))
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
 
-  test("merge with null struct using default value") {
+  test("merge with struct of nulls with missing source field and null target 
field") {
     Seq(true, false).foreach { withSchemaEvolution =>
       Seq(true, false).foreach { coerceNestedTypes =>
         withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
           coerceNestedTypes.toString) {
           withTempView("source") {
-            sql(
-              s"""CREATE TABLE $tableNameAsString (
-                 | pk INT NOT NULL,
-                 | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
-                 |   named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 
'default')),
-                 | dep STRING)
-                 |PARTITIONED BY (dep)
-                 |""".stripMargin)
-
-            val initialSchema = StructType(Seq(
-              StructField("pk", IntegerType, nullable = false),
-              StructField("s", StructType(Seq(
-                StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType),
-                  StructField("b", StringType)
-                )))
-              ))),
-              StructField("dep", StringType)
-            ))
-            val initialData = Seq(
-              Row(0, Row(1, Row(10, "x")), "sales"),
-              Row(1, Row(2, Row(20, "y")), "hr")
-            )
-            spark.createDataFrame(spark.sparkContext.parallelize(initialData), 
initialSchema)
-              .writeTo(tableNameAsString).append()
+            // Target has struct with 3 fields, but row 1 has null for the 
extra field c3
+            createAndInitTable(
+              s"""pk INT NOT NULL,
+                 |s STRUCT<c1: INT, c2: STRING, c3: INT>,
+                 |dep STRING""".stripMargin,
+              """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": 
"sales" }
+                |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": null }, "dep": 
"hr" }"""
+                .stripMargin)
 
+            // Source table has struct with 2 fields (missing field c3)
             val sourceTableSchema = StructType(Seq(
               StructField("pk", IntegerType),
               StructField("s", StructType(Seq(
                 StructField("c1", IntegerType),
-                StructField("c2", StructType(Seq(
-                  StructField("a", IntegerType)
-                )))
+                StructField("c2", StringType)
+                // missing field c3
               ))),
               StructField("dep", StringType)
             ))
 
+            // Source has a struct with two null field values (not a null 
struct)
             val data = Seq(
-              Row(1, null, "engineering"),
-              Row(2, null, "finance")
+              Row(1, Row(null, null), "engineering")
             )
             spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
               .createOrReplaceTempView("source")
@@ -5125,25 +5042,871 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
 
             if (coerceNestedTypes && withSchemaEvolution) {
               sql(mergeStmt)
+              // Struct of null values should be preserved, not converted to 
null struct
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
-                  Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
-                  Row(2, null, "finance")))
+                  Row(0, Row(1, "a", 10), "sales"),
+                  Row(1, Row(null, null, null), "engineering")))
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
               }
-              assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
             }
           }
-          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
         }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
       }
     }
   }
 
+  test("merge with null struct - update field") {
+    withTempView("source") {
+      createAndInitTable(
+        s"""pk INT NOT NULL,
+           |s STRUCT<c1: INT, c2: STRING>,
+           |dep STRING""".stripMargin,
+        """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
+          |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+          .stripMargin)
+
+      // Source table matches target table schema
+      val sourceTableSchema = StructType(Seq(
+        StructField("pk", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)
+        ))),
+        StructField("dep", StringType)
+      ))
+
+      val data = Seq(
+        Row(1, null, "engineering"),
+        Row(2, null, "finance")
+      )
+      spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+        .createOrReplaceTempView("source")
+
+      sql(
+        s"""MERGE INTO $tableNameAsString t USING source
+           |ON t.pk = source.pk
+           |WHEN MATCHED THEN
+           | UPDATE SET s = source.s
+           |WHEN NOT MATCHED THEN
+           | INSERT *
+           |""".stripMargin)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(0, Row(1, "a"), "sales"),
+          Row(1, null, "hr"),
+          Row(2, null, "finance")))
+    }
+    sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+  }
+
+  test("merge with null nested struct in doubly nested struct") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+
+      withTempView("source") {
+        // Target has doubly nested struct with 2 fields in innermost struct
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "foo" } }, 
"dep": "sales" }
+            |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "bar" } }, 
"dep": "hr" }"""
+            .stripMargin)
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType),
+          StructField("s", StructType(Seq(
+            StructField("c1", IntegerType),
+            StructField("c2", StructType(Seq(
+              StructField("a", IntegerType),
+              StructField("b", StringType)
+            )))
+          ))),
+          StructField("dep", StringType)
+        ))
+
+        // Source has a row where the nested struct (c2) is null
+        val data = Seq(
+          Row(1, Row(3, null), "engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+             |ON t.pk = source.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin
+
+        sql(mergeStmt)
+        // Null nested struct should be preserved
+        checkAnswer(
+          sql(s"SELECT * FROM $tableNameAsString"),
+          Seq(
+            Row(0, Row(1, Row(10, "foo")), "sales"),
+            Row(1, Row(3, null), "engineering")))
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+
+  test("merge with null struct into non-nullable struct column") {
+    withTempView("source") {
+      createAndInitTable(
+        s"""pk INT NOT NULL,
+           |s STRUCT<c1: INT, c2: STRING> NOT NULL,
+           |dep STRING""".stripMargin,
+        """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
+          |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+          .stripMargin)
+
+      // Source table has null for the struct column
+      val sourceTableSchema = StructType(Seq(
+        StructField("pk", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)
+        ))),
+        StructField("dep", StringType)
+      ))
+
+      val data = Seq(
+        Row(1, null, "engineering"),
+        Row(2, null, "finance")
+      )
+      spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+        .createOrReplaceTempView("source")
+
+      // Should throw an exception when trying to insert/update null into NOT 
NULL column
+      val exception = intercept[Exception] {
+        sql(
+          s"""MERGE INTO $tableNameAsString t USING source
+             |ON t.pk = source.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin)
+      }
+      assert(exception.getMessage.contains(
+        "NULL value appeared in non-nullable field"))
+    }
+    sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+  }
+
+  test("merge with with null struct with missing nested field") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            // Target table has nested struct with fields c1 and c2
+            createAndInitTable(
+              s"""pk INT NOT NULL,
+                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+                 |dep STRING""".stripMargin,
+              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
+                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+                .stripMargin)
+
+            // Source table has null for the nested struct
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType)
+                  // missing field 'b'
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering"),
+              Row(2, null, "finance")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Row(10, "x")), "sales"),
+                  Row(1, Row(null, Row(null, "y")), "engineering"),
+                  Row(2, null, "finance")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge with null source struct with extra target source field being 
null") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            // Target table has nested struct, row 1 has null for field 'b' 
(missing in source)
+            createAndInitTable(
+              s"""pk INT NOT NULL,
+                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+                 |dep STRING""".stripMargin,
+              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
+                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": null } }, 
"dep": "hr" }"""
+                .stripMargin)
+
+            // Source table has struct with missing nested field 'b'
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType)
+                  // missing field 'b'
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              // It's not immediately obvious, but because the target had 
extra fields
+              // we preserve them despite them being null (and thus retain the 
struct of nulls)
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Row(10, "x")), "sales"),
+                  Row(1, Row(null, Row(null, null)), "engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge with null source struct with extra target field in doubly nested 
struct") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            // Target has struct nested in struct, with extra field 'y' in 
innermost struct
+            val targetTableSchema = StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType),
+                StructField("b", StructType(Seq(
+                  StructField("x", IntegerType),
+                  StructField("y", StringType)
+                )))
+              )))
+            ))
+
+            val columns = Array(
+              Column.create("pk", IntegerType, false),
+              Column.create("s", targetTableSchema),
+              Column.create("dep", StringType))
+            createTable(columns)
+
+            val targetData = Seq(
+              Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+              Row(1, Row(2, Row(20, Row(200, null))), "hr")
+            )
+            val targetDataSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", targetTableSchema),
+              StructField("dep", StringType)
+            ))
+            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetDataSchema)
+              .writeTo(tableNameAsString).append()
+
+            // Source has struct with missing field 'y' in innermost struct
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType),
+                  StructField("b", StructType(Seq(
+                    StructField("x", IntegerType)
+                    // missing field 'y'
+                  )))
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              // Because the target had extra field 'y' which is null,
+              // we preserve it and retain the struct of nulls
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+                  Row(1, Row(null, Row(null, Row(null, null))), 
"engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge with null source and target nested struct with extra target 
field") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            // Target has struct nested in struct, with extra field 'y' in 
innermost struct
+            val targetTableSchema = StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StructType(Seq(
+                StructField("a", IntegerType),
+                StructField("b", StructType(Seq(
+                  StructField("x", IntegerType),
+                  StructField("y", StringType)
+                )))
+              )))
+            ))
+
+            val columns = Array(
+              Column.create("pk", IntegerType, false),
+              Column.create("s", targetTableSchema),
+              Column.create("dep", StringType))
+            createTable(columns)
+
+            // Target data has null for innermost struct 'b' which has the 
extra field 'y'
+            val targetData = Seq(
+              Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+              Row(1, Row(2, Row(20, null)), "hr")
+            )
+            val targetDataSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", targetTableSchema),
+              StructField("dep", StringType)
+            ))
+            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetDataSchema)
+              .writeTo(tableNameAsString).append()
+
+            // Source has struct with missing field 'y' in innermost struct
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType),
+                  StructField("b", StructType(Seq(
+                    StructField("x", IntegerType)
+                    // missing field 'y'
+                  )))
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            // Source data also has null for innermost struct 'b'
+            val data = Seq(
+              Row(1, Row(3, Row(30, null)), "engineering")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              // Both source and target have null for 'b', which should remain 
null
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+                  Row(1, Row(3, Row(30, null)), "engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge with null source struct with extra target field in struct inside 
array") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            // Target has struct with array of structs, with extra field 'y' 
in array element struct
+            val arrayElementSchema = StructType(Seq(
+              StructField("x", IntegerType),
+              StructField("y", StringType)
+            ))
+            val targetTableSchema = StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("arr", ArrayType(arrayElementSchema))
+            ))
+
+            val columns = Array(
+              Column.create("pk", IntegerType, false),
+              Column.create("s", targetTableSchema),
+              Column.create("dep", StringType))
+            createTable(columns)
+
+            val targetData = Seq(
+              Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))), "sales"),
+              Row(1, Row(2, Seq(Row(200, null), Row(201, null))), "hr")
+            )
+            val targetDataSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", targetTableSchema),
+              StructField("dep", StringType)
+            ))
+            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetDataSchema)
+              .writeTo(tableNameAsString).append()
+
+            // Source has struct with missing field 'y' in array element struct
+            val sourceArrayElementSchema = StructType(Seq(
+              StructField("x", IntegerType)
+              // missing field 'y'
+            ))
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("arr", ArrayType(sourceArrayElementSchema))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              // Because the target had extra field 'y' which is within an 
array,
+              // it cannot be referenced and so we do not preserve it and 
allow source null
+              // to override it.
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))), 
"sales"),
+                  Row(1, null, "engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge with null source struct with extra null target field in struct 
containing array") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            val arrayElementSchema = StructType(Seq(
+              StructField("x", IntegerType)
+            ))
+            val targetTableSchema = StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("arr", ArrayType(arrayElementSchema)),
+              StructField("c2", StringType) // extra field at nested struct 
level
+            ))
+
+            val columns = Array(
+              Column.create("pk", IntegerType, false),
+              Column.create("s", targetTableSchema),
+              Column.create("dep", StringType))
+            createTable(columns)
+
+            val targetData = Seq(
+              Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"),
+              Row(1, Row(2, Seq(Row(200), Row(201)), null), "hr") // c2 is null
+            )
+            val targetDataSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", targetTableSchema),
+              StructField("dep", StringType)
+            ))
+            spark.createDataFrame(spark.sparkContext.parallelize(targetData), 
targetDataSchema)
+              .writeTo(tableNameAsString).append()
+
+            // Source has struct missing field 'c2'
+            val sourceArrayElementSchema = StructType(Seq(
+              StructField("x", IntegerType)
+            ))
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("arr", ArrayType(sourceArrayElementSchema))
+                // missing field 'c2'
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              // Because the target had extra field 'c2' which is null,
+              // we preserve it and retain the struct of nulls
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"),
+                  Row(1, Row(null, null, null), "engineering")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get ==
+                "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+        }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge null struct with schema evolution - source with missing and 
extra nested fields") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+            withTempView("source") {
+              // Target table has nested struct with fields c1 and c2
+              createAndInitTable(
+                s"""pk INT NOT NULL,
+                   |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+                   |dep STRING""".stripMargin,
+                """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
+                  |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+                  .stripMargin)
+
+              // Source table has missing field 'b' and extra field 'c' in 
nested struct
+              val sourceTableSchema = StructType(Seq(
+                StructField("pk", IntegerType),
+                StructField("s", StructType(Seq(
+                  StructField("c1", IntegerType),
+                  StructField("c2", StructType(Seq(
+                    StructField("a", IntegerType),
+                    // missing field 'b'
+                    StructField("c", StringType) // extra field 'c'
+                  )))
+                ))),
+                StructField("dep", StringType)
+              ))
+
+              val data = Seq(
+                Row(1, null, "engineering"),
+                Row(2, null, "finance")
+              )
+              spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+                .createOrReplaceTempView("source")
+
+              val schemaEvolutionClause = if (withSchemaEvolution) "WITH 
SCHEMA EVOLUTION" else ""
+              val mergeStmt =
+                s"""MERGE $schemaEvolutionClause
+                   |INTO $tableNameAsString t USING source
+                   |ON t.pk = source.pk
+                   |WHEN MATCHED THEN
+                   | UPDATE SET *
+                   |WHEN NOT MATCHED THEN
+                   | INSERT *
+                   |""".stripMargin
+
+              if (coerceNestedTypes && withSchemaEvolution) {
+                // extra nested field is added
+                sql(mergeStmt)
+                checkAnswer(
+                  sql(s"SELECT * FROM $tableNameAsString"),
+                  Seq(
+                    Row(0, Row(1, Row(10, "x", null)), "sales"),
+                    Row(1, Row(null, Row(null, "y", null)), "engineering"),
+                    Row(2, null, "finance")))
+              } else {
+                val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                  sql(mergeStmt)
+                }
+                assert(exception.errorClass.get ==
+                  "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+              }
+            }
+          }
+        sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge null struct with non-nullable nested field - source with missing 
" +
+    "and extra nested fields") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            createAndInitTable(
+              s"""pk INT NOT NULL,
+                 |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
+                 |dep STRING""".stripMargin,
+              """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, 
"dep": "sales" }
+                |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, 
"dep": "hr" }"""
+                .stripMargin)
+
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType),
+                  StructField("c", StringType)
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering"),
+              Row(2, null, "finance")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause
+                 |INTO $tableNameAsString t USING source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            val exception = intercept[org.apache.spark.sql.AnalysisException] {
+              sql(mergeStmt)
+            }
+            assert(exception.errorClass.get ==
+              "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            assert(exception.getMessage.contains(
+              "Cannot find data for the output column `s`.`c2`.`b`"))
+          }
+          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+        }
+      }
+    }
+  }
+
+  test("merge with null struct using default value") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      Seq(true, false).foreach { coerceNestedTypes =>
+        withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+          coerceNestedTypes.toString) {
+          withTempView("source") {
+            sql(
+              s"""CREATE TABLE $tableNameAsString (
+                 | pk INT NOT NULL,
+                 | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+                 |   named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 
'default')),
+                 | dep STRING)
+                 |PARTITIONED BY (dep)
+                 |""".stripMargin)
+
+            val initialSchema = StructType(Seq(
+              StructField("pk", IntegerType, nullable = false),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType),
+                  StructField("b", StringType)
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+            val initialData = Seq(
+              Row(0, Row(1, Row(10, "x")), "sales"),
+              Row(1, Row(2, Row(20, "y")), "hr")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(initialData), 
initialSchema)
+              .writeTo(tableNameAsString).append()
+
+            val sourceTableSchema = StructType(Seq(
+              StructField("pk", IntegerType),
+              StructField("s", StructType(Seq(
+                StructField("c1", IntegerType),
+                StructField("c2", StructType(Seq(
+                  StructField("a", IntegerType)
+                )))
+              ))),
+              StructField("dep", StringType)
+            ))
+
+            val data = Seq(
+              Row(1, null, "engineering"),
+              Row(2, null, "finance")
+            )
+            spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+              .createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING 
source
+                 |ON t.pk = source.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET *
+                 |WHEN NOT MATCHED THEN
+                 | INSERT *
+                 |""".stripMargin
+
+            if (coerceNestedTypes && withSchemaEvolution) {
+              sql(mergeStmt)
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString"),
+                Seq(
+                  Row(0, Row(1, Row(10, "x")), "sales"),
+                  Row(1, Row(null, Row(null, "y")), "engineering"),
+                  Row(2, null, "finance")))
+            } else {
+              val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
+                sql(mergeStmt)
+              }
+              assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+            }
+          }
+          sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+        }
+      }
+    }
+  }
 
   test("merge with source missing struct column with default value") {
     withTempView("source") {
@@ -5258,8 +6021,8 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
-                  Row(1, Row(10, Row(20, null)), "sales"),
-                  Row(2, Row(20, Row(30, null)), "engineering")))
+                  Row(1, Row(10, Row(20, true)), "sales"),
+                  Row(2, Row(20, Row(30, false)), "engineering")))
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
                 sql(mergeStmt)
@@ -5918,7 +6681,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
               checkAnswer(
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
-                  Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+                  Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), 
"sales"),
                   Row(2, Row(20, Row(null, Map("e" -> "f"), true)), 
"engineering")))
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
@@ -6267,7 +7030,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                 sql(s"SELECT * FROM $tableNameAsString"),
                 Seq(
                   Row(0, Row(1, Row(10, "x")), "sales"),
-                  Row(1, null, "engineering"),
+                  Row(1, Row(null, Row(null, "y")), "engineering"),
                   Row(2, null, "finance")))
             } else {
               val exception = 
intercept[org.apache.spark.sql.AnalysisException] {
@@ -6371,7 +7134,7 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                   sql(s"SELECT * FROM $tableNameAsString"),
                   Seq(
                     Row(0, Row(1, Row(10, "x", null)), "sales"),
-                    Row(1, null, "engineering"),
+                    Row(1, Row(null, Row(null, "y", null)), "engineering"),
                     Row(2, null, "finance")))
               } else {
                 val exception = 
intercept[org.apache.spark.sql.AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to