This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 22e5d886f8dd Revert "[SPARK-54595][SQL] Keep existing behavior of
MERGE INTO without SCHEMA EVOLUTION clause"
22e5d886f8dd is described below
commit 22e5d886f8dd09bf21479aea6b9b7f91ca67730a
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Dec 5 19:21:43 2025 -0800
Revert "[SPARK-54595][SQL] Keep existing behavior of MERGE INTO without
SCHEMA EVOLUTION clause"
This reverts commit 16a3f6c36b9dda0f009dd3e9e9a8fcb002fce9d8.
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 10 +-
.../ResolveRowLevelCommandAssignments.scala | 2 +-
.../sql/connector/MergeIntoTableSuiteBase.scala | 1381 +++++++++++---------
.../execution/command/PlanResolutionSuite.scala | 98 +-
4 files changed, 783 insertions(+), 708 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 23d32dd87db1..08c31939f161 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1736,8 +1736,9 @@ class Analyzer(
Assignment(key, sourceAttr)
}
} else {
- targetTable.output.map { attr =>
- Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ sourceTable.output.flatMap { sourceAttr =>
+ findAttrInTarget(sourceAttr.name).map(
+ targetAttr => Assignment(targetAttr, sourceAttr))
}
}
UpdateAction(
@@ -1774,8 +1775,9 @@ class Analyzer(
Assignment(key, sourceAttr)
}
} else {
- targetTable.output.map { attr =>
- Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
+ sourceTable.output.flatMap { sourceAttr =>
+ findAttrInTarget(sourceAttr.name).map(
+ targetAttr => Assignment(targetAttr, sourceAttr))
}
}
InsertAction(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index bf1016ba8268..d1b8eab13191 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -53,7 +53,7 @@ object ResolveRowLevelCommandAssignments extends
Rule[LogicalPlan] {
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved &&
m.rewritable && !m.aligned &&
!m.needSchemaEvolution =>
validateStoreAssignmentPolicy()
- val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes &&
m.withSchemaEvolution
+ val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes
m.copy(
targetTable = cleanAttrMetadata(m.targetTable),
matchedActions = alignActions(m.targetTable.output, m.matchedActions,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 611daef36db0..e59b4435c408 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -2404,7 +2404,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sourceDF.createOrReplaceTempView("source")
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt = s"""MERGE $schemaEvolutionClause
+ sql(s"""MERGE $schemaEvolutionClause
|INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
@@ -2412,9 +2412,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| UPDATE SET *
|WHEN NOT MATCHED THEN
| INSERT *
- |""".stripMargin
+ |""".stripMargin)
if (withSchemaEvolution && schemaEvolutionEnabled) {
- sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
@@ -2425,12 +2424,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(5, 250, "executive", true),
Row(6, 350, null, false)))
} else {
- val e = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
- assert(e.message.contains("A column, variable, or function parameter
with name " +
- "`dep` cannot be resolved"))
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 150, "marketing"),
+ Row(5, 250, "executive"),
+ Row(6, 350, null)))
}
sql(s"DROP TABLE $tableNameAsString")
}
@@ -2461,7 +2463,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sourceDF.createOrReplaceTempView("source")
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt = s"""MERGE $schemaEvolutionClause
+ sql(s"""MERGE $schemaEvolutionClause
|INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
@@ -2469,9 +2471,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| UPDATE SET *
|WHEN NOT MATCHED THEN
| INSERT *
- |""".stripMargin
+ |""".stripMargin)
if (withSchemaEvolution && schemaEvolutionEnabled) {
- sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
@@ -2482,12 +2483,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(5, 250, "executive", true),
Row(6, 350, "unknown", false)))
} else {
- val e = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
- assert(e.getMessage.contains("A column, variable, or function
parameter with name " +
- "`dep` cannot be resolved"))
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 150, "marketing"),
+ Row(5, 250, "executive"),
+ Row(6, 350, "unknown")))
}
sql(s"DROP TABLE $tableNameAsString")
}
@@ -3235,13 +3239,23 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
|WHEN NOT MATCHED THEN
| INSERT *
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
@@ -3321,18 +3335,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
|WHEN NOT MATCHED THEN
| INSERT *
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3508,18 +3534,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `m`.`key`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3574,18 +3612,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT (pk, m, dep) VALUES (src.pk, src.m, 'my_new_dep')
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `m`.`key`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3638,18 +3688,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "sales"),
- Row(2, Array(Row(20, null, false)), "engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "sales"),
+ Row(2, Array(Row(20, null, false)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct
`a`.`element`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `a`.`element`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3702,18 +3764,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT (pk, a, dep) VALUES (src.pk, src.a, 'my_new_dep')
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "my_old_dep"),
- Row(2, Array(Row(20, null, false)), "my_new_dep")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "my_old_dep"),
+ Row(2, Array(Row(20, null, false)), "my_new_dep")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct
`a`.`element`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `a`.`element`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3977,8 +4051,6 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"UNRESOLVED_COLUMN.WITH_SUGGESTION")
- assert(exception.message.contains(" A column, variable, or function
parameter with name "
- + "`bonus` cannot be resolved"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -4412,321 +4484,267 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in struct nested in array") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 3 fields (c1, c2, c3) in array
- createAndInitTable(
- s"""pk INT NOT NULL,
- |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ],
"dep": "sales" }
- |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ],
"dep": "sales" }"""
- .stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields (c1, c2, c3) in array
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], "dep":
"sales" }
+ |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ],
"dep": "sales" }"""
+ .stripMargin)
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("a", ArrayType(
- StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType))))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Array(Row(10, "c")), "hr"),
- Row(2, Array(Row(30, "e")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("a", ArrayType(
+ StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Array(Row(10, "c")), "hr"),
+ Row(2, Array(Row(30, "e")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Array(Row(1, "a", true)), "sales"),
+ Row(1, Array(Row(10, "c", null)), "hr"),
+ Row(2, Array(Row(30, "e", null)), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Array(Row(1, "a", true)), "sales"),
- Row(1, Array(Row(10, "c", null)), "hr"),
- Row(2, Array(Row(30, "e", null)), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `a`.`element`.`c3`."))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
test("merge into with source missing fields in struct nested in map key") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 2 fields in map key
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType),
StructField("c2", BooleanType))),
- StructType(Seq(StructField("c3", StringType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
- val targetData = Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(20, false) -> Row("y")), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- // Source table has struct with only 1 field (c1) in map key -
missing c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map key
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))), // missing c2
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", BooleanType))),
StructType(Seq(StructField("c3", StringType))))),
StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val targetData = Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(20, false) -> Row("y")), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- if (coerceNestedTypes && withSchemaEvolution) {
+ // Source table has struct with only 1 field (c1) in map key -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))), // missing c2
+ StructType(Seq(StructField("c3", StringType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(10, null) -> Row("z")), "sales"),
+ Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(10, null) -> Row("z")), "sales"),
- Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `m`.`key`.`c2`."))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
test("merge into with source missing fields in struct nested in map value") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 2 fields in map value
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType),
StructField("c2", BooleanType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
- val targetData = Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(20) -> Row("y", false)), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- // Source table has struct with only 1 field (c1) in map value -
missing c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map value
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
StructField("m", MapType(
StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType))))), // missing c2
+ StructType(Seq(StructField("c1", StringType),
StructField("c2", BooleanType))))),
StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
-
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
-
- if (coerceNestedTypes && withSchemaEvolution) {
- sql(mergeStmt)
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(10) -> Row("z", null)), "sales"),
- Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- }
- }
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
- }
- }
- }
- }
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
- test("merge into with source missing fields in top-level struct") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 3 fields at top level
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
+ val targetData = Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(20) -> Row("y", false)), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 1 field (c1) in map value -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))),
+ StructType(Seq(StructField("c1", StringType))))), // missing c2
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(10) -> Row("z", null)), "sales"),
+ Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `m`.`value`.`c2`."))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
- test("merge into with source missing top-level column") {
- Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- // Target table has 3 columns: pk, salary, dep
- createAndInitTable(
- s"""pk INT NOT NULL,
- |salary INT,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "salary": 100, "dep": "sales" }
- |{ "pk": 1, "salary": 200, "dep": "hr" }"""
- .stripMargin)
+ test("merge into with source missing fields in top-level struct") {
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields at top level
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
- // Source table has only 2 columns: pk, dep (missing salary)
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, "engineering"),
- Row(2, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, 100, "sales"),
- Row(1, 200, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes) {
sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c3`."))
}
- assert(exception.errorClass.get ==
- "UNRESOLVED_COLUMN.WITH_SUGGESTION")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
@@ -4866,69 +4884,70 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge with with null struct with missing nested field") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has nested struct with fields c1 and c2
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
- .stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has nested struct with fields c1 and c2
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
- // Source table has null for the nested struct
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
- ))),
- StructField("dep", StringType)
- ))
+ // Source table has null for the nested struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ // Without coercion, the merge should fail due to missing field
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c2`.`b`."))
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
@@ -4979,21 +4998,37 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
- // extra nested field is added
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ // extra nested field is added
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x", null)), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ // extra nested field is not added
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot write extra fields `c` to the struct `s`.`c2`"))
+ }
} else {
+ // Without source struct coercion, the merge should fail
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c2`.`b`."))
}
}
}
@@ -5062,83 +5097,82 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge with null struct using default value") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- | pk INT NOT NULL,
- | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
- | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
- | dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+ | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
- val initialSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- val initialData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
- .writeTo(tableNameAsString).append()
+ val initialSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val initialData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- )))
- ))),
- StructField("dep", StringType)
- ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes && withSchemaEvolution) {
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`b`"))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
@@ -5209,8 +5243,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in nested struct") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { nestedTypeCoercion =>
+ Seq(true, false).foreach { nestedTypeCoercion =>
withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key
-> nestedTypeCoercion.toString) {
withTempView("source") {
@@ -5242,8 +5275,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.createOrReplaceTempView("source")
// Missing field b should be filled with NULL
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt = s"""MERGE $schemaEvolutionClause INTO
$tableNameAsString t
+ val mergeStmt = s"""MERGE INTO $tableNameAsString t
|USING source src
|ON t.pk = src.pk
|WHEN MATCHED THEN
@@ -5252,7 +5284,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (nestedTypeCoercion && withSchemaEvolution) {
+ if (nestedTypeCoercion) {
sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
@@ -5260,17 +5292,16 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(1, Row(10, Row(20, null)), "sales"),
Row(2, Row(20, Row(30, null)), "engineering")))
} else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ val exception = intercept[Exception] {
sql(mergeStmt)
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ """Cannot write incompatible data for the table
``""".stripMargin))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- }
}
test("merge with named_struct missing non-nullable field") {
@@ -5895,18 +5926,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes && withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -5995,18 +6038,30 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes && withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -6077,190 +6132,198 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in top-level struct using
dataframe API") {
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- // Target table has struct with 3 fields at top level
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING)""".stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ // Target table has struct with 3 fields at top level
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING)""".stripMargin)
- val targetData = Seq(
- Row(0, Row(1, "a", true), "sales")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType),
- StructField("c3", BooleanType)
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ val targetData = Seq(
+ Row(0, Row(1, "a", true), "sales")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Create source table with struct having only 2 fields (c1, c2) -
missing c3
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
+ // Create source table with struct having only 2 fields (c1, c2) -
missing c3
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))),
- StructField("dep", StringType)))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))),
+ StructField("dep", StringType)))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
- val mergeBuilder = spark.table(sourceTable)
+ if (coerceNestedTypes) {
+ spark.table(sourceTable)
.mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
+ .merge()
- if (coerceNestedTypes && withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
-
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
}
-
- sql(s"DROP TABLE $tableNameAsString")
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c3`."))
}
+
+ sql(s"DROP TABLE $tableNameAsString")
}
}
}
}
test("merge with null struct with missing nested field using dataframe API")
{
- Seq(true, false).foreach { withSchemaEvolution =>
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- // Target table has nested struct with fields c1 and c2
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
- |dep STRING)""".stripMargin)
-
- val targetData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ // Target table has nested struct with fields c1 and c2
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING)""".stripMargin)
+
+ val targetData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Create source table with missing nested field 'b'
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
- ))),
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
+ // Create source table with missing nested field 'b'
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- // Source table has null for the nested struct
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ // Source table has null for the nested struct
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
- val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString,
- $"source_table.pk" === col(tableNameAsString + ".pk"))
- .whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ val mergeBuilder = spark.table(sourceTable)
+ .mergeInto(tableNameAsString,
+ $"source_table.pk" === col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
- if (coerceNestedTypes && withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ if (coerceNestedTypes) {
+ mergeBuilder.merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ // Without coercion, the merge should fail due to missing field
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
}
-
- sql(s"DROP TABLE $tableNameAsString")
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c2`.`b`."))
}
+
+ sql(s"DROP TABLE $tableNameAsString")
}
}
}
@@ -6346,21 +6409,37 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes && withSchemaEvolution) {
- // extra nested field is added
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ // extra nested field is added
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x", null)), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ // extra nested field is not added
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot write extra fields `c` to the struct `s`.`c2`"))
+ }
} else {
+ // Without source struct coercion, the merge should fail
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c2`.`b`."))
}
sql(s"DROP TABLE $tableNameAsString")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8e5ee1644f9c..18042bf73adf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1633,10 +1633,10 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
if (starInUpdate) {
assert(updateAssigns.size == 2)
-
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
-
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
-
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
+
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
+
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
+
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
} else {
assert(updateAssigns.size == 1)
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
@@ -1656,10 +1656,10 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
if (starInInsert) {
assert(insertAssigns.size == 2)
-
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
-
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
-
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
+
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
+
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
+
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
} else {
assert(insertAssigns.size == 2)
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
@@ -1720,40 +1720,8 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
}
- // star with schema evolution
- val sqlStarSchemaEvolution =
- s"""
- |MERGE WITH SCHEMA EVOLUTION INTO $target AS target
- |USING $source AS source
- |ON target.i = source.i
- |WHEN MATCHED AND (target.s='delete') THEN DELETE
- |WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
- |WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
- """.stripMargin
- parseAndResolve(sqlStarSchemaEvolution) match {
- case MergeIntoTable(
- SubqueryAlias(AliasIdentifier("target", Seq()),
AsDataSourceV2Relation(target)),
- SubqueryAlias(AliasIdentifier("source", Seq()),
AsDataSourceV2Relation(source)),
- mergeCondition,
- Seq(DeleteAction(Some(EqualTo(dl: AttributeReference,
StringLiteral("delete")))),
- UpdateAction(Some(EqualTo(ul: AttributeReference,
- StringLiteral("update"))), updateAssigns, _)),
- Seq(InsertAction(Some(EqualTo(il: AttributeReference,
StringLiteral("insert"))),
- insertAssigns)),
- Seq(),
- withSchemaEvolution) =>
- checkMergeConditionResolution(target, source, mergeCondition)
- checkMatchedClausesResolution(target, source, Some(dl), Some(ul),
updateAssigns,
- starInUpdate = true)
- checkNotMatchedClausesResolution(target, source, Some(il),
insertAssigns,
- starInInsert = true)
- assert(withSchemaEvolution === true)
-
- case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
- }
-
// star
- val sqlStarWithoutSchemaEvolution =
+ val sql2 =
s"""
|MERGE INTO $target AS target
|USING $source AS source
@@ -1762,7 +1730,7 @@ class PlanResolutionSuite extends SharedSparkSession with
AnalysisTest {
|WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
|WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
""".stripMargin
- parseAndResolve(sqlStarWithoutSchemaEvolution) match {
+ parseAndResolve(sql2) match {
case MergeIntoTable(
SubqueryAlias(AliasIdentifier("target", Seq()),
AsDataSourceV2Relation(target)),
SubqueryAlias(AliasIdentifier("source", Seq()),
AsDataSourceV2Relation(source)),
@@ -2368,11 +2336,24 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
|USING testcat.tab2
|ON 1 = 1
|WHEN MATCHED THEN UPDATE SET *""".stripMargin
- checkError(
- exception = intercept[AnalysisException](parseAndResolve(sql2)),
- condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
- parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
- context = ExpectedContext(fragment = sql2, start = 0, stop = 80))
+ val parsed2 = parseAndResolve(sql2)
+ parsed2 match {
+ case MergeIntoTable(
+ AsDataSourceV2Relation(target),
+ AsDataSourceV2Relation(source),
+ EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+ Seq(UpdateAction(None, updateAssigns, _)), // Matched actions
+ Seq(), // Not matched actions
+ Seq(), // Not matched by source actions
+ withSchemaEvolution) =>
+ val ti = target.output.find(_.name == "i").get
+ val si = source.output.find(_.name == "i").get
+ assert(updateAssigns.size == 1)
+
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+ assert(withSchemaEvolution === false)
+ case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
+ }
// INSERT * with incompatible schema between source and target tables.
val sql3 =
@@ -2380,11 +2361,24 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
|USING testcat.tab2
|ON 1 = 1
|WHEN NOT MATCHED THEN INSERT *""".stripMargin
- checkError(
- exception = intercept[AnalysisException](parseAndResolve(sql3)),
- condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
- parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
- context = ExpectedContext(fragment = sql3, start = 0, stop = 80))
+ val parsed3 = parseAndResolve(sql3)
+ parsed3 match {
+ case MergeIntoTable(
+ AsDataSourceV2Relation(target),
+ AsDataSourceV2Relation(source),
+ EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+ Seq(), // Matched action
+ Seq(InsertAction(None, insertAssigns)), // Not matched actions
+ Seq(), // Not matched by source actions
+ withSchemaEvolution) =>
+ val ti = target.output.find(_.name == "i").get
+ val si = source.output.find(_.name == "i").get
+ assert(insertAssigns.size == 1)
+
assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+ assert(withSchemaEvolution === false)
+ case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
+ }
val sql4 =
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]