This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 558ea1ce32ab [SPARK-56550][SQL] Support source with fewer
columns/fields in INSERT INTO WITH SCHEMA EVOLUTION
558ea1ce32ab is described below
commit 558ea1ce32aba863c7eb476cfb5020aa227170db
Author: Szehon Ho <[email protected]>
AuthorDate: Wed May 20 02:11:30 2026 +0800
[SPARK-56550][SQL] Support source with fewer columns/fields in INSERT INTO
WITH SCHEMA EVOLUTION
### What changes were proposed in this pull request?
Add support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing
nested struct fields with null (or column defaults) when the source has fewer
fields than the target table. This mirrors the existing `MERGE INTO` behavior
gated by `spark.sql.mergeNestedTypeCoercion.enabled`.
**Specific changes:**
1. **New config flag**: `spark.sql.insertNestedTypeCoercion.enabled`
(internal, default `false`) — mirrors the existing
`spark.sql.mergeNestedTypeCoercion.enabled` for MERGE.
2. **Refactored `TableOutputResolver.resolveOutputColumns`**: Replaced two
overlapping boolean parameters (`supportColDefaultValue`, `fillNestedDefaults`)
with a single `DefaultValueFillMode` enum (`NONE`, `FILL`, `RECURSE`), making
the API cleaner and the intent explicit at each call site.
3. **RECURSE mode for V2 inserts**: When both schema evolution and the
coercion flag are enabled, `RECURSE` mode fills missing nested struct fields
with null, relaxes the by-position arity check, and recurses into structs
nested within arrays and maps.
**Supported scenarios** (source has fewer columns/fields than target, with
schema evolution + coercion flag):
| Scenario | Before | After (+ coercion flag) |
|---|---|---|
| Missing top-level column (by name) | fill with default/null | same
(unchanged) |
| Missing top-level column with DEFAULT (by name) | fill with default value
| same (unchanged) |
| Missing top-level column (by position) | error | fill trailing with
default/null |
| Missing top-level column with DEFAULT (by position) | error | fill
trailing with default value |
| Missing nested struct field (by name) | error | fill with null |
| Missing nested struct field (by position) | error | fill with null |
| Missing field in struct inside array (by name) | error | fill with null |
| Missing field in struct inside map value (by name) | error | fill with
null |
| Missing deeply nested struct field (by name) | error | fill with null |
### Why are the changes needed?
`MERGE INTO` already supports coercing nested types when the source has
fewer struct fields than the target (via
`spark.sql.mergeNestedTypeCoercion.enabled`). `INSERT INTO WITH SCHEMA
EVOLUTION` lacked this capability, causing errors for legitimate use cases
where the source schema is a subset of the target schema at the nested level.
This is important for schema evolution workflows where tables accumulate
new nested fields over time, but older data sources may not have all fields
populated.
### Does this PR introduce _any_ user-facing change?
Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true`
(default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` will no longer fail
when the source has fewer nested struct fields than the target. Instead,
missing fields are filled with null. This is gated behind an internal,
experimental config flag.
### How was this patch tested?
Added 17 new test cases in `InsertIntoSchemaEvolutionTests`:
**Positive tests** (with schema evolution + coercion flag):
- Missing top-level column by name / by position
- Missing top-level column with DEFAULT value by name / by position
- Missing nested struct field by name / by position
- Missing field in struct nested in array / map value
- Missing deeply nested struct field
- Null struct with missing field by name / by position
- Mixed null and non-null structs with missing field
- Null deeply nested struct with missing field
- Null struct in array with missing field
**Negative tests** (verifying errors when coercion is disabled):
- Missing top-level column by name / by position (without evolution)
- Missing nested struct field by name / by position (without evolution)
- Missing nested struct field with evolution but without coercion flag
All 64 matched tests pass.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)
Closes #55427 from szehon-ho/insert-schema-evolution-missing-fields.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 10 +-
.../catalyst/analysis/TableOutputResolver.scala | 124 +++-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +
.../spark/sql/execution/datasources/rules.scala | 2 +-
.../spark/sql/connector/InsertIntoTests.scala | 738 +++++++++++++++++++++
.../configs-without-binding-policy-exceptions | 1 +
6 files changed, 853 insertions(+), 37 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b1eaec8bb521..a27b6a19b4c6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkException,
SparkUnsupportedOperationException}
import org.apache.spark.internal.config.ConfigBindingPolicy
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
+import
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._
import org.apache.spark.sql.catalyst.analysis.resolver.{
AnalyzerBridgeState,
HybridAnalyzer,
@@ -3789,9 +3790,16 @@ class Analyzer(
validateStoreAssignmentPolicy()
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
+ // With schema evolution + coercion flag, missing top-level columns
AND missing nested
+ // struct fields are filled with defaults/null (RECURSE mode).
Otherwise, only missing
+ // top-level columns are filled via FILL mode; missing nested struct
fields still cause
+ // schema enforcement errors.
+ val defaultValueFillMode =
+ if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled)
RECURSE
+ else FILL
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query,
v2Write.isByName, conf,
- supportColDefaultValue = true)
+ defaultValueFillMode)
if (projection != v2Write.query) {
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 7eacc5ab9b2a..d691c449733f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
/**
* Modes for filling in default or null values for missing columns.
- * If FILL, fill missing top-level columns with their default values.
- * If RECURSE, fill missing top-level columns and also recurse into nested
struct
- * fields to fill null.
+ * If FILL, fill missing top-level columns with their default values
(by-name reorder path).
+ * If RECURSE, fill missing top-level columns (including trailing columns on
the by-position
+ * path for INSERT with schema evolution when enabled) and recurse into
nested structs,
+ * arrays, and maps to fill missing struct fields with null or defaults.
* If NONE, do not fill any missing columns.
*/
object DefaultValueFillMode extends Enumeration {
@@ -92,19 +93,22 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
query: LogicalPlan,
byName: Boolean,
conf: SQLConf,
- supportColDefaultValue: Boolean = false): LogicalPlan = {
+ defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = {
if (expected.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, expected.map(_.name), query.output)
}
+ // In RECURSE mode, allow fewer source columns than target by filling
trailing columns
+ // with defaults. In other modes, a column count mismatch in by-position
resolution is
+ // an error.
+ val fillDefaultValue = defaultValueFillMode == RECURSE
val errors = new mutable.ArrayBuffer[String]()
val resolved: Seq[NamedExpression] = if (byName) {
- // If a top-level column does not have a corresponding value in the
input query, fill with
- // the column's default value. We need to pass `fillDefaultValue` as
FILL here, if the
- // `supportColDefaultValue` parameter is also true.
- val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE
+ // By-name resolution: the defaultValueFillMode is passed through to
control whether
+ // missing top-level columns are filled (FILL/RECURSE) and whether
missing nested
+ // struct fields are also filled (RECURSE only).
reorderColumnsByName(
tableName,
query.output,
@@ -112,13 +116,15 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf,
errors += _,
Nil,
- defaultValueFillMode)
+ defaultValueFillMode,
+ enforceFullOutput = true)
} else {
- if (expected.size > query.output.size) {
+ if (expected.size > query.output.size && !fillDefaultValue) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, expected.map(_.name), query.output)
}
- resolveColumnsByPosition(tableName, query.output, expected, conf, errors
+= _)
+ resolveColumnsByPosition(
+ tableName, query.output, expected, conf, errors += _, fillDefaultValue
= fillDefaultValue)
}
if (errors.nonEmpty) {
@@ -157,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
case (valueType: StructType, colType: StructType) =>
val resolvedValue = resolveStructType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case (valueType: ArrayType, colType: ArrayType) =>
val resolvedValue = resolveArrayType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case (valueType: MapType, colType: MapType) =>
val resolvedValue = resolveMapType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case _ =>
checkUpdate(tableName, value, col, conf, addError, colPath)
@@ -304,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String] = Nil,
- defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression]
= {
+ defaultValueFillMode: DefaultValueFillMode.Value,
+ enforceFullOutput: Boolean = false): Seq[NamedExpression] = {
val matchedCols = mutable.HashSet.empty[String]
val reordered = expectedCols.flatMap { expectedCol =>
val matched = inputCols.filter(col => conf.resolver(col.name,
expectedCol.name))
@@ -336,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
case (matchedType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case (matchedType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case (matchedType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case _ =>
checkField(
tableName, actualExpectedCol, matchedCol, byName = true, conf,
addError, newColPath)
@@ -366,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
} else {
reordered
}
+ } else if (enforceFullOutput) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else expectedCols.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
Nil
}
@@ -377,7 +389,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
expectedCols: Seq[Attribute],
conf: SQLConf,
addError: String => Unit,
- colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+ colPath: Seq[String] = Nil,
+ fillDefaultValue: Boolean = false): Seq[NamedExpression] = {
val actualExpectedCols = expectedCols.map { attr =>
attr.withDataType {
CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
}
@@ -393,7 +406,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
tableName, colPath.quoted, extraColsStr
)
}
- } else if (inputCols.size < actualExpectedCols.size) {
+ } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) {
val missingColsStr =
actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
@@ -407,25 +420,48 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
}
}
- inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+ val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol,
expectedCol) =>
val newColPath = colPath :+ expectedCol.name
(inputCol.dataType, expectedCol.dataType) match {
case (inputType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case (inputType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case (inputType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case _ =>
checkField(tableName, expectedCol, inputCol, byName = false, conf,
addError, newColPath)
}
}
+
+ val defaults = if (fillDefaultValue) {
+ actualExpectedCols.drop(inputCols.size).map { expectedCol =>
+ val defaultExpr = getDefaultValueExprOrNullLit(
+ expectedCol, conf.useNullsForMissingDefaultColumnValues)
+ if (defaultExpr.isEmpty) {
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
+ tableName, (colPath :+ expectedCol.name).quoted)
+ }
+ applyColumnMetadata(defaultExpr.get, expectedCol)
+ }
+ } else {
+ Nil
+ }
+
+ val result = matched ++ defaults
+ if (result.length != actualExpectedCols.size) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
+ }
+ result
}
private[sql] def checkNullability(
@@ -447,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
input.nullable && !attr.nullable && conf.storeAssignmentPolicy !=
StoreAssignmentPolicy.LEGACY
}
+ // scalastyle:off argcount
private def resolveStructType(
tableName: String,
input: Expression,
@@ -457,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val fields = inputType.zipWithIndex.map { case (f, i) =>
Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)()
@@ -465,10 +503,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
val resolved = if (byName) {
reorderColumnsByName(tableName, fields, toAttributes(expectedType),
conf, addError, colPath,
- defaultValueMode)
+ defaultValueMode, enforceFullOutput)
} else {
resolveColumnsByPosition(
- tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+ tableName, fields, toAttributes(expectedType), conf, addError,
colPath, fillDefaultValue)
}
if (resolved.length == expectedType.length) {
val struct = CreateStruct(resolved)
@@ -478,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
struct
}
Some(applyColumnMetadata(res, expected))
+ } else if (enforceFullOutput) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else expectedType.fields.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
@@ -493,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val param = NamedLambdaVariable("element", inputType.elementType,
inputType.containsNull)
val fakeAttr =
@@ -501,9 +545,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val res = if (byName) {
val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf,
addError, colPath,
- defaultValueMode)
+ defaultValueMode, enforceFullOutput)
} else {
- resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf,
addError, colPath)
+ resolveColumnsByPosition(
+ tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath,
fillDefaultValue)
}
if (res.length == 1) {
val castedArray =
@@ -515,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
ArrayTransform(nullCheckedInput, func)
}
Some(applyColumnMetadata(castedArray, expected))
+ } else if (enforceFullOutput) {
+ val colName = if (colPath.nonEmpty) colPath.quoted else
toSQLId(expected.name)
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
@@ -530,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable =
false)
@@ -538,9 +587,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE
val resKey = if (byName) {
reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf,
addError, colPath,
- defaultValueFillMode)
+ defaultValueFillMode, enforceFullOutput)
} else {
- resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr),
conf, addError, colPath)
+ resolveColumnsByPosition(
+ tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath,
fillDefaultValue)
}
val valueParam =
@@ -549,10 +599,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
AttributeReference("value", expectedType.valueType,
expectedType.valueContainsNull)()
val resValue = if (byName) {
reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr),
conf, addError, colPath,
- defaultValueFillMode)
+ defaultValueFillMode, enforceFullOutput)
} else {
resolveColumnsByPosition(
- tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError,
colPath)
+ tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError,
colPath, fillDefaultValue)
}
if (resKey.length == 1 && resValue.length == 1) {
@@ -577,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
MapFromArrays(newKeys, newValues)
}
Some(applyColumnMetadata(casted, expected))
+ } else if (enforceFullOutput) {
+ val colName = if (colPath.nonEmpty) colPath.quoted else
toSQLId(expected.name)
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
}
+ // scalastyle:on argcount
// For table insertions, capture the overflow errors and show proper message.
// Without this method, the overflow errors of castings will show hints for
turning off ANSI SQL
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5ed831f20f39..498cf2a1cded 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -7305,6 +7305,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+ buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+ .internal()
+ .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill
missing nested " +
+ "struct fields with null when the source has fewer nested fields than
the target " +
+ "table. Also relaxes by-position column-count enforcement so trailing
missing " +
+ "top-level columns are filled with their default value (or null). This
is " +
+ "experimental and the semantics may change.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
val TIME_TYPE_ENABLED =
buildConf("spark.sql.timeType.enabled")
.internal()
@@ -8634,6 +8646,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def coerceMergeNestedTypes: Boolean =
getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED)
+ def coerceInsertNestedTypes: Boolean =
+ getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED)
+
def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED)
def listaggAllowDistinctCastWithOrder: Boolean =
getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ce41bbe4aeb3..7122dd52ef1a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -528,7 +528,7 @@ object PreprocessTableInsertion extends
ResolveInsertionBase {
query,
byName,
conf,
- supportColDefaultValue = true)
+ TableOutputResolver.DefaultValueFillMode.FILL)
} catch {
case e: AnalysisException if staticPartCols.nonEmpty &&
(e.getCondition ==
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 4f023136a6fe..42017c2dd60e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
byName: Boolean = false,
replaceWhere: Option[String] = None): Unit
+ /** Insert data into a table by name without schema evolution. */
+ protected def doInsertByName(
+ tableName: String,
+ insert: DataFrame,
+ mode: SaveMode = SaveMode.Append): Unit = {
+ val tmpView = "tmp_view"
+ withTempView(tmpView) {
+ insert.createOrReplaceTempView(tmpView)
+ val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+ sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView")
+ }
+ }
+
+ /** Run a block with INSERT nested type coercion enabled. */
+ protected def withInsertNestedTypeCoercion(f: => Unit): Unit = {
+ withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
"true") {
+ f
+ }
+ }
+
test("Insert schema evolution: extra column - no auto-schema-evolution
capability") {
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
@@ -1403,4 +1423,722 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
assert(spark.table(t1).schema("id").dataType === IntegerType)
}
}
+
+ //
---------------------------------------------------------------------------
+ // Tests for source with fewer columns/fields than target
+ //
---------------------------------------------------------------------------
+
+ test("Insert schema evolution: source missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType)))
+ val data = Seq(Row(0, 100, "sales"))
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data),
schema))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position")
{
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // By position: source col 1 maps to target col 1, source col 2 maps to
target col 2,
+ // trailing target col 3 is filled with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string)
USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT
'unknown') USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing deeply nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: mixed null and non-null structs with missing
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2,
null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2,
null)))
+ }
+ }
+
+ test("Insert schema evolution: null deeply nested struct with missing field
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+ }
+ }
+
+ test("Insert schema evolution: null struct in array with missing field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Source has "active" (extra) but is missing "salary". Column count is
the same (3)
+ // but names differ; by-name resolution should add "active" via schema
evolution
+ // and fill "salary" with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+ byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, salary, dep, active FROM $t1"),
+ Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field
count is the same
+ // (3) but names differ; by-name resolution should add "c4" via schema
evolution and fill
+ // "c3" with null.
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c4", DoubleType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+ Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Negative tests: missing columns/fields should fail WITHOUT schema
evolution
+ //
---------------------------------------------------------------------------
+
+ test("Insert without evolution: source missing top-level column by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Without explicit DEFAULT on `salary`, missing by-name data only
errors when null-fill
+ // for missing defaults is disabled; otherwise FILL mode inserts null
for `salary`.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`salary`")
+ )
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position
fails " +
+ "when null default disabled and column has no explicit DEFAULT") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`dep`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position fails " +
+ "when null default disabled") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert without evolution: source missing top-level column by position
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "tableColumns" -> "`id`, `salary`, `dep`",
+ "dataColumns" -> "`id`, `salary`")
+ )
+ }
+ }
+
+ test("Insert without evolution: source missing nested struct field by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertByName(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+
+ test("Insert with evolution but without coercion flag:" +
+ " source missing nested struct field by name fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+
+ test("Insert without evolution: source missing nested struct field by
position fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsert(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`",
+ "missingFields" -> "`c3`")
+ )
+ }
+ }
}
diff --git
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index 2aa6cb885ca3..36fda2b50688 100644
---
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold
spark.sql.inMemoryColumnarStorage.partitionPruning
spark.sql.inMemoryTableScanStatistics.enable
+spark.sql.insertNestedTypeCoercion.enabled
spark.sql.join.preferSortMergeJoin
spark.sql.json.enableExactStringParsing
spark.sql.json.enablePartialResults
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]