This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new aedd42be645e [SPARK-54621][SQL] Merge Into Update Set * preserve
nested fields if …coerceNestedTypes is enabled
aedd42be645e is described below
commit aedd42be645e7e7492d240989d9325e750ddb843
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Dec 18 15:43:25 2025 +0800
[SPARK-54621][SQL] Merge Into Update Set * preserve nested fields if
…coerceNestedTypes is enabled
### What changes were proposed in this pull request?
The 'struct coercion' feature for MERGE INTO (allowing it to pass if
assigning a struct with less fields into a struct with more fields) is turned
off in a flag in https://github.com/apache/spark/pull/53229 due to some
ambiguity in behavior, but was not removed because the community wanted to try
it.
We want to still keep it under a flag, but we make a choice about which
behavior to support when the flag is on. In particular, we want UPDATE SET *
to explode to all nested struct fields, so that in this scenario, existing
nested struct fields are preserved.
### Why are the changes needed?
aokolnychyi tested the feature and thinks that even if it is behind the
experimental flag, we should take the stance for now that UPDATE SET * should
explode to all nested fields vs top level columns.
The rationale being:
* its always safer to not override user values with null
* Spark in general tries to treat nested fields like columns
* there's already a way for the user to override the whole struct (and
nullify non-existing fields) by specifying the struct explicitly, ie UPDATE SET
struct = source.struct
### Does this PR introduce _any_ user-facing change?
No, the whole feature is new and hidden behind an experimental flag.
### How was this patch tested?
Existing tests (some output changes to not be null)
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53360 from szehon-ho/SPARK-54621.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 92e5b3644d567392024464e4b3bd5f28fe69a550)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/AssignmentUtils.scala | 163 ++-
.../sql/connector/MergeIntoTableSuiteBase.scala | 1111 +++++++++++++++++---
2 files changed, 1091 insertions(+), 183 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 6c7b0626e81e..df4b0646ed42 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -19,15 +19,17 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.SQLConfHelper
import
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE,
RECURSE}
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Expression, GetStructField, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.ArrayImplicits._
@@ -72,7 +74,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport
{
assignments,
addError = err => errors += err,
colPath = Seq(attr.name),
- coerceNestedTypes)
+ coerceNestedTypes,
+ fromStar)
}
if (errors.nonEmpty) {
@@ -156,7 +159,8 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
assignments: Seq[Assignment],
addError: String => Unit,
colPath: Seq[String],
- coerceNestedTypes: Boolean = false): Expression = {
+ coerceNestedTypes: Boolean = false,
+ updateStar: Boolean = false): Expression = {
val (exactAssignments, otherAssignments) = assignments.partition {
assignment =>
assignment.key.semanticEquals(colExpr)
@@ -178,11 +182,25 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
} else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) {
TableOutputResolver.checkNullability(colExpr, col, conf, colPath)
} else if (exactAssignments.nonEmpty) {
- val value = exactAssignments.head.value
- val coerceMode = if (coerceNestedTypes) RECURSE else NONE
- val resolvedValue = TableOutputResolver.resolveUpdate("", value, col,
conf, addError,
- colPath, coerceMode)
- resolvedValue
+ if (updateStar && SQLConf.get.coerceMergeNestedTypes) {
+ val value = exactAssignments.head.value
+ col.dataType match {
+ case _: StructType =>
+ // Expand assignments to leaf fields (fixNullExpansion is applied
inside)
+ applyNestedFieldAssignments(col, colExpr, value, addError, colPath,
+ coerceNestedTypes)
+ case _ =>
+ // For non-struct types, resolve directly
+ val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+ TableOutputResolver.resolveUpdate("", value, col, conf, addError,
colPath,
+ coerceMode)
+ }
+ } else {
+ val value = exactAssignments.head.value
+ val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+ TableOutputResolver.resolveUpdate("", value, col, conf, addError,
+ colPath, coerceMode)
+ }
} else {
applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath,
coerceNestedTypes)
}
@@ -211,7 +229,67 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
case otherType =>
addError(
"Updating nested fields is only supported for StructType but " +
- s"'${colPath.quoted}' is of type $otherType")
+ s"'${colPath.quoted}' is of type $otherType")
+ colExpr
+ }
+ }
+
+ private def applyNestedFieldAssignments(
+ col: Attribute,
+ colExpr: Expression,
+ value: Expression,
+ addError: String => Unit,
+ colPath: Seq[String],
+ coerceNestedTypes: Boolean): Expression = {
+
+ col.dataType match {
+ case structType: StructType =>
+ val fieldAttrs = DataTypeUtils.toAttributes(structType)
+
+ val updatedFieldExprs = fieldAttrs.zipWithIndex.map { case (fieldAttr,
ordinal) =>
+ val fieldPath = colPath :+ fieldAttr.name
+ val targetFieldExpr = GetStructField(colExpr, ordinal,
Some(fieldAttr.name))
+
+ // Try to find a corresponding field in the source value by name
+ val sourceFieldValue: Expression = value.dataType match {
+ case valueStructType: StructType =>
+ valueStructType.fields.find(f => conf.resolver(f.name,
fieldAttr.name)) match {
+ case Some(matchingField) =>
+ // Found matching field in source, extract it
+ val fieldIndex =
valueStructType.fieldIndex(matchingField.name)
+ GetStructField(value, fieldIndex, Some(matchingField.name))
+ case None =>
+ // Field doesn't exist in source, use target's current value
with null check
+ TableOutputResolver.checkNullability(targetFieldExpr,
fieldAttr, conf, fieldPath)
+ }
+ case _ =>
+ // Value is not a struct, cannot extract field
+ addError(s"Cannot assign non-struct value to struct field
'${fieldPath.quoted}'")
+ Literal(null, fieldAttr.dataType)
+ }
+
+ // Recurse or resolve based on field type
+ fieldAttr.dataType match {
+ case _: StructType =>
+ // Field is a struct, recurse
+ applyNestedFieldAssignments(fieldAttr, targetFieldExpr,
+ sourceFieldValue, addError, fieldPath, coerceNestedTypes)
+ case _ =>
+ // Field is not a struct, resolve with TableOutputResolver
+ val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+ TableOutputResolver.resolveUpdate("", sourceFieldValue,
fieldAttr, conf, addError,
+ fieldPath, coerceMode)
+ }
+ }
+ val namedStruct = toNamedStruct(structType, updatedFieldExprs)
+
+ // Prevent unnecessary null struct expansion
+ fixNullExpansion(colExpr, value, structType, namedStruct, colPath)
+
+ case otherType =>
+ addError(
+ "Updating nested fields is only supported for StructType but " +
+ s"'${colPath.quoted}' is of type $otherType")
colExpr
}
}
@@ -223,6 +301,73 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
CreateNamedStruct(namedStructExprs)
}
+ /**
+ * Checks if target struct has extra fields compared to source struct,
recursively.
+ */
+ private def hasExtraTargetFields(targetType: StructType, sourceType:
DataType): Boolean = {
+ sourceType match {
+ case sourceStructType: StructType =>
+ targetType.fields.exists { targetField =>
+ sourceStructType.fields.find(f => conf.resolver(f.name,
targetField.name)) match {
+ case Some(sourceField) =>
+ // Check nested structs recursively
+ (targetField.dataType, sourceField.dataType) match {
+ case (targetNested: StructType, sourceNested) =>
+ hasExtraTargetFields(targetNested, sourceNested)
+ case _ => false
+ }
+ case None => true // target has extra field not in source
+ }
+ }
+ case _ =>
+ // Should be caught earlier
+ throw SparkException.internalError(
+ s"Source type must be StructType but found: $sourceType")
+ }
+ }
+
+ /**
+ * As UPDATE SET * assigns struct fields individually (preserving existing
fields),
+ * this will lead to indiscriminate null expansion, ie, a struct is created
where all
+ * fields are null. Wraps a struct assignment with a condition to return
null
+ * if both conditions are true:
+ *
+ * - source struct is null
+ * - target struct is null OR target struct is same as source struct
+ *
+ * If the condition is not true, we preserve the original structure.
+ * This includes cases where the source was a struct of nulls,
+ * or there were any extra target fields (including null ones),
+ * both cases retain the assignment to a struct of nulls.
+ *
+ * @param key the original assignment key (target struct) expression
+ * @param value the original assignment value (source struct) expression
+ * @param structType the target struct type
+ * @param structExpression the result create struct expression result to wrap
+ * @param colPath the column path for error reporting
+ * @return the wrapped expression with null checks
+ */
+ private def fixNullExpansion(
+ key: Expression,
+ value: Expression,
+ structType: StructType,
+ structExpression: Expression,
+ colPath: Seq[String]): Expression = {
+ if (key.nullable) {
+ val condition = if (hasExtraTargetFields(structType, value.dataType)) {
+ // extra target fields: return null iff source struct is null and
target struct is null
+ And(IsNull(value), IsNull(key))
+ } else {
+ // schemas match: return null iff source struct is null
+ IsNull(value)
+ }
+
+ If(condition, Literal(null, structExpression.dataType), structExpression)
+ } else {
+ structExpression
+ }
+ }
+
/**
* Checks whether assignments are aligned and compatible with table columns.
*
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 5d1173b5a1a5..ea19a916b7cc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -3240,9 +3240,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+ Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)),
"sales"),
Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
-
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
@@ -4776,8 +4775,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
-
- test("merge with null struct - update field") {
+ test("merge with struct of nulls") {
withTempView("source") {
createAndInitTable(
s"""pk INT NOT NULL,
@@ -4797,9 +4795,10 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
StructField("dep", StringType)
))
+ // Source has a struct with null field values (not a null struct)
val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
+ Row(1, Row(null, null), "engineering"),
+ Row(2, Row(null, null), "finance")
)
spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source")
@@ -4808,31 +4807,32 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
s"""MERGE INTO $tableNameAsString t USING source
|ON t.pk = source.pk
|WHEN MATCHED THEN
- | UPDATE SET s = source.s
+ | UPDATE SET *
|WHEN NOT MATCHED THEN
| INSERT *
|""".stripMargin)
+ // Struct of null values should be preserved, not converted to null
struct
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
Row(0, Row(1, "a"), "sales"),
- Row(1, null, "hr"),
- Row(2, null, "finance")))
+ Row(1, Row(null, null), "engineering"),
+ Row(2, Row(null, null), "finance")))
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- test("merge with null struct into non-nullable struct column") {
+ test("merge with null struct into struct of nulls") {
withTempView("source") {
createAndInitTable(
s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING> NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING>,
|dep STRING""".stripMargin,
"""{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+ |{ "pk": 1, "s": { "c1": null, "c2": null }, "dep": "hr" }"""
.stripMargin)
- // Source table has null for the struct column
+ // Source table matches target table schema
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType),
StructField("s", StructType(Seq(
@@ -4842,61 +4842,60 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
StructField("dep", StringType)
))
+ // Source has a null struct (not a struct of nulls)
val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
+ Row(1, null, "engineering")
)
spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source")
- // Should throw an exception when trying to insert/update null into NOT
NULL column
- val exception = intercept[Exception] {
- sql(
- s"""MERGE INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
- }
- assert(exception.getMessage.contains(
- "NULL value appeared in non-nullable field"))
+ sql(
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin)
+ // Null struct should override struct of nulls
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a"), "sales"),
+ Row(1, null, "engineering")))
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- test("merge with with null struct with missing nested field") {
+ test("merge with null struct into struct of nulls with extra target field") {
Seq(true, false).foreach { withSchemaEvolution =>
Seq(true, false).foreach { coerceNestedTypes =>
withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
coerceNestedTypes.toString) {
withTempView("source") {
- // Target table has nested struct with fields c1 and c2
+ // Target has struct with 3 fields, row 1 has all nulls including
extra field c3
createAndInitTable(
s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |s STRUCT<c1: INT, c2: STRING, c3: INT>,
|dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep":
"sales" }
+ |{ "pk": 1, "s": { "c1": null, "c2": null, "c3": null },
"dep": "hr" }"""
.stripMargin)
- // Source table has null for the nested struct
+ // Source table has struct with 2 fields (missing c3)
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType),
StructField("s", StructType(Seq(
StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
+ StructField("c2", StringType)
+ // missing field c3
))),
StructField("dep", StringType)
))
+ // Source has a null struct (not a struct of nulls)
val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
+ Row(1, null, "engineering")
)
spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source")
@@ -4913,12 +4912,12 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Because target has extra field c3, we preserve struct of nulls
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
+ Row(0, Row(1, "a", 10), "sales"),
+ Row(1, Row(null, null, null), "engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
@@ -4933,114 +4932,42 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
- test("merge null struct with schema evolution - source with missing and
extra nested fields") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has nested struct with fields c1 and c2
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
- .stripMargin)
-
- // Source table has missing field 'b' and extra field 'c' in
nested struct
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- // missing field 'b'
- StructField("c", StringType) // extra field 'c'
- )))
- ))),
- StructField("dep", StringType)
- ))
-
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
-
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH
SCHEMA EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
-
- if (coerceNestedTypes && withSchemaEvolution) {
- // extra nested field is added
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- }
- }
- }
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
- }
- }
- }
-
- test("merge null struct with non-nullable nested field - source with missing
" +
- "and extra nested fields") {
+ test("merge with struct of nulls with missing source field") {
Seq(true, false).foreach { withSchemaEvolution =>
Seq(true, false).foreach { coerceNestedTypes =>
withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
coerceNestedTypes.toString) {
withTempView("source") {
+ // Target has struct with 3 fields
createAndInitTable(
s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
+ |s STRUCT<c1: INT, c2: STRING, c3: INT>,
|dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep":
"sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": 20 }, "dep": "hr"
}"""
.stripMargin)
+ // Source table has struct with 2 fields (missing field c3)
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType),
StructField("s", StructType(Seq(
StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("c", StringType)
- )))
+ StructField("c2", StringType)
+ // missing field c3
))),
StructField("dep", StringType)
))
+ // Source has a struct with two null field values (not a null
struct)
val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
+ Row(1, Row(null, null), "engineering")
)
spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source")
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t USING source
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
|ON t.pk = source.pk
|WHEN MATCHED THEN
| UPDATE SET *
@@ -5048,67 +4975,57 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Struct of null values should be preserved, not converted to
null struct
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", 10), "sales"),
+ Row(1, Row(null, null, 20), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `s`.`c2`.`b`"))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
- test("merge with null struct using default value") {
+ test("merge with struct of nulls with missing source field and null target
field") {
Seq(true, false).foreach { withSchemaEvolution =>
Seq(true, false).foreach { coerceNestedTypes =>
withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
coerceNestedTypes.toString) {
withTempView("source") {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- | pk INT NOT NULL,
- | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
- | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
- | dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
-
- val initialSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- val initialData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
- .writeTo(tableNameAsString).append()
+ // Target has struct with 3 fields, but row 1 has null for the
extra field c3
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: INT>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep":
"sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": null }, "dep":
"hr" }"""
+ .stripMargin)
+ // Source table has struct with 2 fields (missing field c3)
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType),
StructField("s", StructType(Seq(
StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- )))
+ StructField("c2", StringType)
+ // missing field c3
))),
StructField("dep", StringType)
))
+ // Source has a struct with two null field values (not a null
struct)
val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
+ Row(1, Row(null, null), "engineering")
)
spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source")
@@ -5125,25 +5042,871 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Struct of null values should be preserved, not converted to
null struct
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
+ Row(0, Row(1, "a", 10), "sales"),
+ Row(1, Row(null, null, null), "engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
+ test("merge with null struct - update field") {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+ .stripMargin)
+
+ // Source table matches target table schema
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ sql(
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET s = source.s
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a"), "sales"),
+ Row(1, null, "hr"),
+ Row(2, null, "finance")))
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+
+ test("merge with null nested struct in doubly nested struct") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+
+ withTempView("source") {
+ // Target has doubly nested struct with 2 fields in innermost struct
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "foo" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "bar" } },
"dep": "hr" }"""
+ .stripMargin)
+
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ // Source has a row where the nested struct (c2) is null
+ val data = Seq(
+ Row(1, Row(3, null), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ sql(mergeStmt)
+ // Null nested struct should be preserved
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "foo")), "sales"),
+ Row(1, Row(3, null), "engineering")))
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+
+
+ test("merge with null struct into non-nullable struct column") {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING> NOT NULL,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }"""
+ .stripMargin)
+
+ // Source table has null for the struct column
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ // Should throw an exception when trying to insert/update null into NOT
NULL column
+ val exception = intercept[Exception] {
+ sql(
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin)
+ }
+ assert(exception.getMessage.contains(
+ "NULL value appeared in non-nullable field"))
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+
+ test("merge with with null struct with missing nested field") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has nested struct with fields c1 and c2
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
+
+ // Source table has null for the nested struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(null, Row(null, "y")), "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge with null source struct with extra target source field being
null") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has nested struct, row 1 has null for field 'b'
(missing in source)
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": null } },
"dep": "hr" }"""
+ .stripMargin)
+
+ // Source table has struct with missing nested field 'b'
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ // It's not immediately obvious, but because the target had
extra fields
+ // we preserve them despite them being null (and thus retain the
struct of nulls)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(null, Row(null, null)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge with null source struct with extra target field in doubly nested
struct") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target has struct nested in struct, with extra field 'y' in
innermost struct
+ val targetTableSchema = StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StructType(Seq(
+ StructField("x", IntegerType),
+ StructField("y", StringType)
+ )))
+ )))
+ ))
+
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", targetTableSchema),
+ Column.create("dep", StringType))
+ createTable(columns)
+
+ val targetData = Seq(
+ Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+ Row(1, Row(2, Row(20, Row(200, null))), "hr")
+ )
+ val targetDataSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", targetTableSchema),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetDataSchema)
+ .writeTo(tableNameAsString).append()
+
+ // Source has struct with missing field 'y' in innermost struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StructType(Seq(
+ StructField("x", IntegerType)
+ // missing field 'y'
+ )))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ // Because the target had extra field 'y' which is null,
+ // we preserve it and retain the struct of nulls
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+ Row(1, Row(null, Row(null, Row(null, null))),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge with null source and target nested struct with extra target
field") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target has struct nested in struct, with extra field 'y' in
innermost struct
+ val targetTableSchema = StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StructType(Seq(
+ StructField("x", IntegerType),
+ StructField("y", StringType)
+ )))
+ )))
+ ))
+
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", targetTableSchema),
+ Column.create("dep", StringType))
+ createTable(columns)
+
+ // Target data has null for innermost struct 'b' which has the
extra field 'y'
+ val targetData = Seq(
+ Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+ Row(1, Row(2, Row(20, null)), "hr")
+ )
+ val targetDataSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", targetTableSchema),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetDataSchema)
+ .writeTo(tableNameAsString).append()
+
+ // Source has struct with missing field 'y' in innermost struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StructType(Seq(
+ StructField("x", IntegerType)
+ // missing field 'y'
+ )))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ // Source data also has null for innermost struct 'b'
+ val data = Seq(
+ Row(1, Row(3, Row(30, null)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ // Both source and target have null for 'b', which should remain
null
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"),
+ Row(1, Row(3, Row(30, null)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge with null source struct with extra target field in struct inside
array") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target has struct with array of structs, with extra field 'y'
in array element struct
+ val arrayElementSchema = StructType(Seq(
+ StructField("x", IntegerType),
+ StructField("y", StringType)
+ ))
+ val targetTableSchema = StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("arr", ArrayType(arrayElementSchema))
+ ))
+
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", targetTableSchema),
+ Column.create("dep", StringType))
+ createTable(columns)
+
+ val targetData = Seq(
+ Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))), "sales"),
+ Row(1, Row(2, Seq(Row(200, null), Row(201, null))), "hr")
+ )
+ val targetDataSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", targetTableSchema),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetDataSchema)
+ .writeTo(tableNameAsString).append()
+
+ // Source has struct with missing field 'y' in array element struct
+ val sourceArrayElementSchema = StructType(Seq(
+ StructField("x", IntegerType)
+ // missing field 'y'
+ ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("arr", ArrayType(sourceArrayElementSchema))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ // Because the target had extra field 'y' which is within an
array,
+ // it cannot be referenced and so we do not preserve it and
allow source null
+ // to override it.
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))),
"sales"),
+ Row(1, null, "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge with null source struct with extra null target field in struct
containing array") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ val arrayElementSchema = StructType(Seq(
+ StructField("x", IntegerType)
+ ))
+ val targetTableSchema = StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("arr", ArrayType(arrayElementSchema)),
+ StructField("c2", StringType) // extra field at nested struct
level
+ ))
+
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", targetTableSchema),
+ Column.create("dep", StringType))
+ createTable(columns)
+
+ val targetData = Seq(
+ Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"),
+ Row(1, Row(2, Seq(Row(200), Row(201)), null), "hr") // c2 is null
+ )
+ val targetDataSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", targetTableSchema),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetDataSchema)
+ .writeTo(tableNameAsString).append()
+
+ // Source has struct missing field 'c2'
+ val sourceArrayElementSchema = StructType(Seq(
+ StructField("x", IntegerType)
+ ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("arr", ArrayType(sourceArrayElementSchema))
+ // missing field 'c2'
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ // Because the target had extra field 'c2' which is null,
+ // we preserve it and retain the struct of nulls
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"),
+ Row(1, Row(null, null, null), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge null struct with schema evolution - source with missing and
extra nested fields") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has nested struct with fields c1 and c2
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
+
+ // Source table has missing field 'b' and extra field 'c' in
nested struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ // missing field 'b'
+ StructField("c", StringType) // extra field 'c'
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH
SCHEMA EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ // extra nested field is added
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x", null)), "sales"),
+ Row(1, Row(null, Row(null, "y", null)), "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+
+ test("merge null struct with non-nullable nested field - source with missing
" +
+ "and extra nested fields") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
+
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("c", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`b`"))
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+ }
+
+ test("merge with null struct using default value") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+ | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+
+ val initialSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val initialData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
+ .writeTo(tableNameAsString).append()
+
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(null, Row(null, "y")), "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+ }
test("merge with source missing struct column with default value") {
withTempView("source") {
@@ -5258,8 +6021,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(1, Row(10, Row(20, null)), "sales"),
- Row(2, Row(20, Row(30, null)), "engineering")))
+ Row(1, Row(10, Row(20, true)), "sales"),
+ Row(2, Row(20, Row(30, false)), "engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
@@ -5918,7 +6681,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+ Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)),
"sales"),
Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
@@ -6267,7 +7030,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
+ Row(1, Row(null, Row(null, "y")), "engineering"),
Row(2, null, "finance")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
@@ -6371,7 +7134,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
+ Row(1, Row(null, Row(null, "y", null)), "engineering"),
Row(2, null, "finance")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]