This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 25ffaf7bd24a [SPARK-57113][SDP] Prevent AutoCDC keys from changing
across SDP runs
25ffaf7bd24a is described below
commit 25ffaf7bd24a4e18072cff7a04cdd652d1c29994
Author: AnishMahto <[email protected]>
AuthorDate: Thu May 28 10:39:14 2026 -0700
[SPARK-57113][SDP] Prevent AutoCDC keys from changing across SDP runs
### What changes were proposed in this pull request?
**Problem**
An AutoCDC flow's key columns determine how rows are matched during the
merge into the target. If a user changes the declared keys between pipeline
runs without a full refresh, the merge silently mis-routes rows (updates become
inserts and vice versa) and corrupts the target. We need to detect a key change
between runs and fail fast.
**Proposed Solution**
Record the resolved key columns as a JSON list into a reserved property on
the auxiliary table the first time it is created; on subsequent runs, validate
the flow's declared keys against this recorded value and fail with
`AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT` on mismatch.
**Implementation notes**
Aux-table creation switched from `CREATE TABLE` DDL to the programmatic V2
`TableCatalog.createTable` API, so the JSON property reaches storage as raw
bytes (no SQL-literal escape layer). Three new structured sub-classes under
`AUTOCDC_INVALID_STATE` cover corrupted-metadata cases caused by user
tampering, all recommending full-refresh as remedy.
### Why are the changes needed?
To prevent users from incorrectly changing AutoCDC keys across runs without
a full refresh.
### Does this PR introduce _any_ user-facing change?
No, unreleased change.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude Opus 4.7
Closes #56160 from AnishMahto/prevent-autocdc-key-drift-v2.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 04190d01836d300f43ec4696f3f6a1e9b87cb4b8)
Signed-off-by: DB Tsai <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 28 +
.../spark/sql/pipelines/graph/FlowExecution.scala | 263 ++++++++--
.../sql/pipelines/util/PipelinesCatalogUtils.scala | 52 ++
.../graph/AutoCdcAuxiliaryTableSuite.scala | 100 ++++
.../graph/AutoCdcGraphExecutionTestMixin.scala | 10 +-
.../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala | 90 +++-
.../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 566 +++++++++++++++++++++
.../graph/AutoCdcScd1MultiPipelineSuite.scala | 61 +++
.../graph/AutoCdcScd1SchemaEvolutionSuite.scala | 9 +-
9 files changed, 1130 insertions(+), 49 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index d66692a3b22e..f9dbae413ac9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -209,6 +209,34 @@
],
"sqlState" : "22023"
},
+ "AUTOCDC_INVALID_STATE" : {
+ "message" : [
+ "AutoCDC flow <flowName> detected an invalid state:"
+ ],
+ "subClass" : {
+ "AUXILIARY_TABLE_KEY_COLUMN_MISSING" : {
+ "message" : [
+ "The auxiliary table <auxTableName> is missing key column
<keyColumnName> that is recorded in its <propertyName> table property. The
auxiliary table schema may be corrupted or have been modified externally.
Perform a full refresh of the target table to recreate the auxiliary table."
+ ]
+ },
+ "AUXILIARY_TABLE_PROPERTY_MALFORMED" : {
+ "message" : [
+ "The auxiliary table <auxTableName> has a malformed <propertyName>
property with raw value '<rawValue>'. The property must be a JSON array of
strings (e.g. '[\"id\",\"region\"]'). The auxiliary table metadata may be
corrupted or have been modified externally. Perform a full refresh of the
target table to recreate the auxiliary table."
+ ]
+ },
+ "AUXILIARY_TABLE_PROPERTY_MISSING" : {
+ "message" : [
+ "The auxiliary table <auxTableName> is missing the required
<propertyName> table property; cannot validate AutoCDC key columns. The
auxiliary table metadata may be corrupted or have been modified externally.
Perform a full refresh of the target table to recreate the auxiliary table."
+ ]
+ },
+ "KEY_SCHEMA_DRIFT" : {
+ "message" : [
+ "The AutoCDC flow's current key columns <expectedKeySchema> do not
match the keys recorded in the auxiliary table <auxTableName> (recorded keys
<recordedKeySchema>). AutoCDC does not support changing key columns or their
types across incremental pipeline runs. To change keys, perform a full refresh
of the target table."
+ ]
+ }
+ },
+ "sqlState" : "42000"
+ },
"AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the AutoCDC key column
`<keyColumnName>` is not present in the flow's selected source schema. AutoCDC
requires every key column to be present in the source change-data feed and
retained by any configured column selection."
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index ea151830f544..0d1c33be2172 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{ExecutionContext, Future}
+import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import org.apache.spark.SparkException
@@ -29,8 +30,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.connector.catalog.{Identifier,
SupportsRowLevelOperations, TableCatalog}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
SupportsRowLevelOperations, Table => CatalogTable, TableCatalog, TableInfo}
import org.apache.spark.sql.pipelines.autocdc.{
AutoCdcReservedNames,
ChangeArgs,
@@ -38,7 +38,7 @@ import org.apache.spark.sql.pipelines.autocdc.{
Scd1ForeachBatchHandler
}
import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
-import org.apache.spark.sql.pipelines.util.SparkSessionUtils
+import org.apache.spark.sql.pipelines.util.{PipelinesCatalogUtils,
SparkSessionUtils}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ThreadUtils
@@ -330,11 +330,51 @@ object AutoCdcAuxiliaryTable {
* Reserved table property key set on the auxiliary table to record which
SCD strategy it
* serves.
*/
- val scdTypePropertyKey: String =
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scd_type"
+ val scdTypePropertyKey: String =
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scdType"
+
+ /**
+ * Table property recording the auxiliary table's unquoted AutoCDC key
column names as a JSON
+ * string array (e.g. `["id","region"]`). Written once when the auxiliary
table is created and is
+ * considered immutable; full-refresh is the only way to change it.
+ */
+ val keyColumnNamesProperty: String =
+ s"${PipelinesTableProperties.pipelinesPrefix}autocdc.keyColumnNames"
+
+ /**
+ * Serialize key column names to the JSON form stored at
[[keyColumnNamesProperty]].
+ * Round-trips an empty list as `[]`; callers are expected to enforce a
non-empty key set
+ * upstream.
+ */
+ def serializeKeyColumnNames(names: Seq[String]): String = {
+ import org.json4s.JsonAST.{JArray, JString}
+ import org.json4s.jackson.JsonMethods.compact
+ compact(JArray(names.map(JString(_)).toList))
+ }
+
+ /**
+ * Parse a [[keyColumnNamesProperty]] value. `None` if it is not a JSON
array of strings.
+ * Round-trips an empty list as `[]`; callers are expected to enforce a
non-empty key set
+ * upstream.
+ */
+ def parseKeyColumnNames(raw: String): Option[Seq[String]] = {
+ import org.json4s.JsonAST.{JArray, JString}
+ import org.json4s.jackson.JsonMethods.parse
+ val parsed = try Some(parse(raw)) catch { case NonFatal(_) => None }
+ parsed.flatMap {
+ case JArray(elems) =>
+ val names = elems.collect { case JString(s) => s }
+ if (names.size == elems.size) Some(names) else None
+ case _ => None
+ }
+ }
}
/**
* Base trait for AutoCDC merge-based write flows.
+ *
+ * Today, this trait and its children manage auxiliary table creation and
validation across
+ * pipeline executions. Eventually we should evolve DatasetManager to be aware
of the concept of
+ * auxiliary tables, and streamline creation/validation there.
*/
trait AutoCdcMergeWriteBase {
/** The spark session the AutoCDC flow is going to be planned in. */
@@ -343,6 +383,9 @@ trait AutoCdcMergeWriteBase {
/** The destination (target) table entity the AutoCDC flow will be writing
to. */
protected def destination: Table
+ /** The AutoCDC flow's identifier, used as `flowName` in error messages
emitted by this mixin. */
+ protected def identifier: TableIdentifier
+
/** The AutoCDC flow's [[ChangeArgs]] (keys, sequencing, columnSelection,
...). */
protected def changeArgs: ChangeArgs
@@ -350,37 +393,82 @@ trait AutoCdcMergeWriteBase {
protected def auxiliaryTableSchema: StructType
/**
- * Idempotently create the auxiliary table for [[destination]] if it does
not already exist
- * and return its [[TableIdentifier]].
+ * Create the auxiliary table for [[destination]] if it does not already
exist and return its
+ * [[TableIdentifier]].
*
- * Note that this is `CREATE TABLE IF NOT EXISTS`: when the aux table
already exists, its
- * schema is left untouched and `auxiliaryTableSchema` is ignored. For SCD1,
they keys must be
- * invariant across executions and the CDC metadata will always be present,
so this is correct.
+ * When the aux table already exists, its schema and properties are left
untouched. For SCD1
+ * the keys must be invariant across executions and the CDC metadata is
always present, so
+ * this is correct; drift validation reads the recorded
`keyColumnNamesProperty` to enforce
+ * the invariant before this method is called.
*/
protected def createAuxiliaryTableIfNotExists(spark: SparkSession):
TableIdentifier = {
val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
- // The auxiliary table inherits the target's format so MERGE semantics
line up. When the
- // target's format is unspecified (None), omit the USING clause and fall
back to the
- // session's default source provider.
- val usingClause = destination.format.map(fmt => s"USING
$fmt").getOrElse("")
- val tblPropertiesClause =
- s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.scdTypePropertyKey}' = " +
- s"'${changeArgs.storedAsScdType.label}')"
- spark.sql(
- s"""CREATE TABLE IF NOT EXISTS
- |${auxIdent.quotedString}
- |(${auxiliaryTableSchema.toDDL}) $usingClause
$tblPropertiesClause""".stripMargin
- )
+ val (catalog, v2Identifier) =
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
+
+ if (!catalog.tableExists(v2Identifier)) {
+ val properties = scala.collection.mutable.Map.empty[String, String]
+
+ // Inherit the target's format so MERGE semantics line up. When
unspecified, omit the
+ // provider so the catalog falls back to its default.
+ destination.format.foreach { fmt =>
properties(TableCatalog.PROP_PROVIDER) = fmt }
+
+ // Record which SCD strategy this auxiliary table serves so downstream
readers can
+ // identify it without having to inspect the schema.
+ properties(AutoCdcAuxiliaryTable.scdTypePropertyKey) =
changeArgs.storedAsScdType.label
+
+ // Persist the AutoCDC key column names as a JSON list on first
creation. The value
+ // is stored verbatim by the catalog.
+ properties(AutoCdcAuxiliaryTable.keyColumnNamesProperty) =
+ AutoCdcAuxiliaryTable.serializeKeyColumnNames(auxiliaryKeyColumnNames)
+
+ // Table creation is not atomic with the table exists check, and
[[createTable]] will fail
+ // with TableAlreadyExistsException if some asynchronous process creates
the table between
+ // the [[tableExists]] check and [[createTable]]. This is both rare (we
don't support
+ // multi-AutoCDC-flow targets so there are no race conditions within a
single pipeline) and
+ // acceptable - users can cleanly retry the failed flow when this
happens. SQL offers an
+ // atomic CREATE IF NOT EXISTS, but would require special casing of the
table properties
+ // in DDL and we would lose compile-time syntax and type safety.
+ catalog.createTable(
+ v2Identifier,
+ new TableInfo.Builder()
+
.withColumns(CatalogV2Util.structTypeToV2Columns(auxiliaryTableSchema))
+ .withProperties(properties.asJava)
+ .build()
+ )
+ }
auxIdent
}
+ /**
+ * Returns the resolved AutoCDC key column names as they appear in the
auxiliary schema, in
+ * `changeArgs.keys` declaration order.
+ */
+ private def auxiliaryKeyColumnNames: Seq[String] = {
+ val resolver = spark.sessionState.conf.resolver
+ changeArgs.keys.map { key =>
+ auxiliaryTableSchema.fields
+ .find(field => resolver(field.name, key.name))
+ .map(_.name)
+ .getOrElse(
+ // This should never happen at this point, as [[AutoCdcMergeFlow]]
should have validated
+ // all changeArgs.keys exist in the deduced aux/target table schemas
by now.
+ throw SparkException.internalError(
+ s"AutoCDC key column '${key.name}' is missing from the auxiliary
table schema " +
+ s"for flow ${identifier.unquotedString} writing to target " +
+ s"${destination.identifier.quotedString}."
+ )
+ )
+ }
+ }
+
/**
* Validate that the target table's underlying connector implements
* [[SupportsRowLevelOperations]], which is the V2 connector contract for
MERGE/UPDATE/DELETE
* with rewrite - all operations that the AutoCDC transformation executes.
*/
protected def requireDestinationSupportsRowLevelOps(): Unit = {
- val (catalog, v2Identifier) = resolveTableCatalog(spark,
destination.identifier)
+ val (catalog, v2Identifier) =
+ PipelinesCatalogUtils.resolveTableCatalog(spark, destination.identifier)
val destinationTable = catalog.loadTable(v2Identifier)
if (!destinationTable.isInstanceOf[SupportsRowLevelOperations]) {
@@ -399,23 +487,123 @@ trait AutoCdcMergeWriteBase {
}
}
- private def resolveTableCatalog(
- spark: SparkSession,
- ident: TableIdentifier): (TableCatalog, Identifier) = {
- val catalogManager = spark.sessionState.catalogManager
- val catalogPlugin = ident.catalog
- .map(catalogManager.catalog)
- .getOrElse(catalogManager.currentCatalog)
- val catalog = catalogPlugin match {
- case t: TableCatalog => t
- case _ => throw
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+ /**
+ * If the auxiliary table for this flow's destination already exists,
validate that the
+ * AutoCDC keys the flow expects line up with the keys recorded in the
auxiliary
+ * table. On a fresh pipeline (or after a full refresh dropped the
auxiliary), the
+ * auxiliary is absent and there's nothing to drift from, so this is a no-op.
+ */
+ protected def validateNoAutoCdcKeyDriftIfAuxTableExists(): Unit = {
+ val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
+ val (catalog, v2Identifier) =
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
+ if (catalog.tableExists(v2Identifier)) {
+ validateNoAutoCdcKeyDrift(catalog.loadTable(v2Identifier), auxIdent)
+ }
+ }
+
+ /**
+ * Validate that the AutoCDC key columns the flow expects match the keys
recorded in the
+ * existing auxiliary table at [[auxIdent]] as a set: same arity, same set
of names (per the
+ * session resolver), same per-name `dataType`s.
+ */
+ private def validateNoAutoCdcKeyDrift(
+ existingAuxTable: CatalogTable,
+ auxIdent: TableIdentifier): Unit = {
+ val resolver = spark.sessionState.conf.resolver
+ val existingAuxSchema =
CatalogV2Util.v2ColumnsToStructType(existingAuxTable.columns())
+
+ // The expected key fields are looked up in [[auxiliaryTableSchema]],
which by construction
+ // contains every key column with its source-derived dataType. We
deliberately do not look
+ // them up in [[existingAuxSchema]] - that's the recorded side, and
conflating the two
+ // sides would mask drift.
+ val expectedKeyFields: Seq[StructField] = changeArgs.keys.map { key =>
+ auxiliaryTableSchema.fields
+ .find(field => resolver(field.name, key.name))
+ .getOrElse(
+ // Construction of [[auxiliaryTableSchema]] already enforces all of
the user-specified
+ // keys are present, so if we don't find a key it is truly an
internal error.
+ throw SparkException.internalError(
+ s"Key column '${key.name}' was not found in the AutoCDC auxiliary
table schema."
+ )
+ )
+ }
+ val recordedKeyNames = parseRecordedKeyColumnNames(existingAuxTable,
auxIdent)
+ val recordedKeyFields: Seq[StructField] = recordedKeyNames.map { name =>
+ existingAuxSchema.fields
+ .find(field => resolver(field.name, name))
+ .getOrElse(
+ // Either an implementation bug or, more likely, the user has
corrupted the auxiliary
+ // table schema (e.g. dropped the key column). The remedy is
full-refresh in either
+ // case.
+ throw new AnalysisException(
+ errorClass =
"AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_KEY_COLUMN_MISSING",
+ messageParameters = Map(
+ "flowName" -> identifier.unquotedString,
+ "auxTableName" -> auxIdent.unquotedString,
+ "keyColumnName" -> name,
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+ )
+ )
+ )
+ }
+
+ val drifted =
+ // Arity drift (added or dropped keys).
+ recordedKeyFields.length != expectedKeyFields.length ||
+ // Name or dataType drift: every expected key must have a same-name
(resolver-aware)
+ // recorded counterpart with an equivalent dataType. Columns changing
nullability and
+ // metadata in the schema are intentionally tolerated, although null key
values during
+ // microbatch execution will be invalidated regardless.
+ expectedKeyFields.exists { expected =>
+ recordedKeyFields.find(rf => resolver(rf.name, expected.name)) match {
+ case None => true
+ case Some(recorded) => !recorded.dataType.sameType(expected.dataType)
+ }
+ }
+
+ if (drifted) {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ messageParameters = Map(
+ "flowName" -> identifier.unquotedString,
+ "auxTableName" -> auxIdent.unquotedString,
+ "expectedKeySchema" -> StructType(expectedKeyFields).toDDL,
+ "recordedKeySchema" -> StructType(recordedKeyFields).toDDL
+ )
+ )
+ }
+ }
+
+ /**
+ * Read the [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] off an existing
auxiliary table
+ * and parse it into the ordered list of recorded AutoCDC key column names.
+ */
+ private def parseRecordedKeyColumnNames(
+ existingAuxTable: CatalogTable,
+ auxIdent: TableIdentifier): Seq[String] = {
+ val rawKeyColumnNamesStr = Option(
+
existingAuxTable.properties().get(AutoCdcAuxiliaryTable.keyColumnNamesProperty)
+ ).getOrElse {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MISSING",
+ messageParameters = Map(
+ "flowName" -> identifier.unquotedString,
+ "auxTableName" -> auxIdent.unquotedString,
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+ )
+ )
}
- val namespace = ident.database.getOrElse(
- throw SparkException.internalError(
- s"Cannot resolve table identifier ${ident.quotedString}: namespace is
unspecified."
+ AutoCdcAuxiliaryTable.parseKeyColumnNames(rawKeyColumnNamesStr).getOrElse {
+ throw new AnalysisException(
+ errorClass =
"AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MALFORMED",
+ messageParameters = Map(
+ "flowName" -> identifier.unquotedString,
+ "auxTableName" -> auxIdent.unquotedString,
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty,
+ "rawValue" -> rawKeyColumnNamesStr
+ )
)
- )
- (catalog, Identifier.of(Array(namespace), ident.table))
+ }
}
}
@@ -435,6 +623,7 @@ class Scd1MergeStreamingWrite(
) extends StreamingFlowExecution with AutoCdcMergeWriteBase {
requireDestinationSupportsRowLevelOps()
+ validateNoAutoCdcKeyDriftIfAuxTableExists()
override def getOrigin: QueryOrigin = flow.origin
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
new file mode 100644
index 000000000000..8df9f128a25d
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/** Catalog-resolution helpers shared across the pipelines module. */
+object PipelinesCatalogUtils {
+
+ /**
+ * Resolve a v1 [[TableIdentifier]] to a `(TableCatalog, Identifier)` pair
usable against the
+ * v2 connector APIs. If `ident.catalog` is unset, falls back to the
session's
+ * `currentCatalog`.
+ */
+ def resolveTableCatalog(
+ spark: SparkSession,
+ ident: TableIdentifier): (TableCatalog, Identifier) = {
+ val catalogManager = spark.sessionState.catalogManager
+ val catalogPlugin = ident.catalog
+ .map(catalogManager.catalog)
+ .getOrElse(catalogManager.currentCatalog)
+ val catalog = catalogPlugin match {
+ case t: TableCatalog => t
+ case _ => throw
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+ }
+ val namespace = ident.database.getOrElse(
+ throw SparkException.internalError(
+ s"Cannot resolve table identifier ${ident.quotedString}: namespace is
unspecified."
+ )
+ )
+ (catalog, Identifier.of(Array(namespace), ident.table))
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
new file mode 100644
index 000000000000..9fb6070c01e7
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for the [[AutoCdcAuxiliaryTable]] companion object, in
particular the
+ * `serializeKeyColumnNames` / `parseKeyColumnNames` round-trip helpers used
to persist the
+ * AutoCDC key column names as a JSON-encoded reserved table property on the
auxiliary table.
+ *
+ * These tests are intentionally session-less: the helpers are pure functions
on `String` and
+ * `Seq[String]`, and verifying their byte-for-byte round-trip contract
requires no Spark
+ * runtime. End-to-end persistence (DDL -> catalog -> SHOW TBLPROPERTIES) is
covered by
+ * `AutoCdcScd1AuxiliaryTableDurabilitySuite`; drift-validator behavior over
the parsed
+ * property is covered by `AutoCdcScd1KeyDriftSuite`.
+ */
+class AutoCdcAuxiliaryTableSuite extends SparkFunSuite {
+
+ // The drift validator stores key column names in a table property as a JSON
array of strings.
+ // These round-trip tests verify that identifier text is preserved verbatim
through
+ // serialize -> parse, including characters that JSON itself must escape
(`"`, `\`, control
+ // chars) and characters that JSON does not touch but that downstream
interpolation might
+ // (`'`, ` `, `.`, backtick). Storage at the table property level is solely
the JSON layer's
+ // concern -- SQL identifier quoting (backticks) is never part of the stored
bytes.
+
+ private def assertKeyColumnNamesRoundTrip(names: Seq[String]): Unit = {
+ val json = AutoCdcAuxiliaryTable.serializeKeyColumnNames(names)
+ assert(
+ AutoCdcAuxiliaryTable.parseKeyColumnNames(json).contains(names),
+ s"round-trip failed: input=${names}, serialized=${json}"
+ )
+ }
+
+ test("serializeKeyColumnNames/parseKeyColumnNames round-trip preserves plain
ASCII names") {
+ assertKeyColumnNamesRoundTrip(Seq("id"))
+ assertKeyColumnNamesRoundTrip(Seq("id", "region"))
+ assertKeyColumnNamesRoundTrip(Seq("id", "region", "country"))
+ }
+
+ test("serializeKeyColumnNames/parseKeyColumnNames round-trip preserves the
empty list") {
+ // Empty key sets are not user-reachable (AutoCdcMergeFlow rejects them
upstream), but the
+ // helpers themselves must round-trip a `[]` JSON array faithfully.
+ assertKeyColumnNamesRoundTrip(Seq.empty)
+ }
+
+ test("serializeKeyColumnNames/parseKeyColumnNames preserves names containing
JSON-escaped " +
+ "characters (quote, backslash, control chars)") {
+ // JSON serializer must escape `"` -> `\"`, `\` -> `\\`, and control
chars; the parser
+ // must invert those escapes and yield the original literal bytes.
+ assertKeyColumnNamesRoundTrip(Seq("a\"b"))
+ assertKeyColumnNamesRoundTrip(Seq("a\\b"))
+ assertKeyColumnNamesRoundTrip(Seq("a\nb"))
+ assertKeyColumnNamesRoundTrip(Seq("a\tb"))
+ // Mixed: every JSON-escaped class in a single name.
+ assertKeyColumnNamesRoundTrip(Seq("a\"b\\c\nd"))
+ }
+
+ test("serializeKeyColumnNames/parseKeyColumnNames preserves names containing
characters " +
+ "that JSON does not escape (single quote, dot, space, backtick)") {
+ // JSON does not escape these, but they are common in real-world
identifiers (especially
+ // when users backtick-quote at the API boundary). They must flow through
verbatim.
+ assertKeyColumnNamesRoundTrip(Seq("it's"))
+ assertKeyColumnNamesRoundTrip(Seq("a.b"))
+ assertKeyColumnNamesRoundTrip(Seq("name with spaces"))
+ assertKeyColumnNamesRoundTrip(Seq("a`b"))
+ // Mixed: a single composite key whose pieces collectively touch every
"passes verbatim"
+ // class.
+ assertKeyColumnNamesRoundTrip(Seq("it's", "name with spaces", "a.b.c",
"back`tick"))
+ }
+
+ test("parseKeyColumnNames returns None for inputs that are not a JSON array
of strings") {
+ // None of these are a top-level JSON array of strings; the parser must
reject every shape
+ // with `None` so callers can surface a structured INTERNAL_ERROR with
consistent wording.
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("not-json").isEmpty)
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("").isEmpty)
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("\"id\"").isEmpty)
// bare string
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("null").isEmpty)
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("{\"id\": 1}").isEmpty)
// object
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[1, 2, 3]").isEmpty)
// numbers
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[\"id\", 1]").isEmpty)
// mixed types
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[\"id\", null]").isEmpty)
+ assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[[\"id\"]]").isEmpty)
// nested array
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 5e2286a4fd56..5ebdb4b4c86d 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -53,10 +53,10 @@ trait AutoCdcGraphExecutionTestMixin extends
BeforeAndAfterEach {
s"spark.sql.catalog.$catalog",
classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName
)
- // Disable per-flow retries so failure-path tests (e.g. INCOMPATIBLE_DATA)
surface the
- // AnalysisException after the first attempt instead of going through the
default 2 retries,
- // which would otherwise emit duplicate FAILED events and inflate test
runtime without
- // changing the asserted outcome.
+ // Disable per-flow retries so failure-path tests (e.g. KEY_SCHEMA_DRIFT,
INCOMPATIBLE_DATA)
+ // surface the AnalysisException after the first attempt instead of going
through the default
+ // 2 retries, which would otherwise emit duplicate FAILED events and
inflate test runtime
+ // without changing the asserted outcome.
spark.conf.set(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key, "0")
spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace")
}
@@ -98,7 +98,7 @@ trait AutoCdcGraphExecutionTestMixin extends
BeforeAndAfterEach {
/**
* Walk every [[Throwable]] reachable from `failure` via
[[Throwable#getSuppressed]] and
- * [[Throwable#getCause]], searching for the first [[SparkThrowable]] whose
+ * [[Throwable#getCause]] for the first [[SparkThrowable]] whose
* [[SparkThrowable#getCondition]] equals `condition`, then run
[[checkError]] against that
* exception with all of its other arguments propagated through.
*/
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 50ff60556a73..5a9f6cb6710b 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -158,6 +158,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// The auxiliary table only contains keys and the metadata column, hence
"name" should not be
// included.
assert(auxSchema.fieldNames.toSeq == Seq("id",
Scd1BatchProcessor.cdcMetadataColName))
+ assert(getAuxTableKeyColumnNames(target = "target") == Seq("id"))
}
test("the auxiliary table preserves the user's declared key order,
independent of the " +
@@ -169,8 +170,9 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// _cdc_metadata) -- same ordering as the source. The user, however,
declares
// `keys = Seq("region", "id")` -- the OPPOSITE order from how those
columns appear in
// both the source DF and the target. The auxiliary table should honor the
user's
- // declared key order, not either physical column ordering, so subsequent
runs
- // positionally compare keys against the same recorded layout.
+ // declared key order, both in the persisted aux schema layout and in the
+ // [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] property value, so
subsequent runs
+ // compare keys against the same recorded layout.
spark.sql(
s"CREATE TABLE $catalog.$namespace.target " +
s"(value STRING, id INT NOT NULL, region STRING NOT NULL, " +
@@ -194,6 +196,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
val auxSchema = spark.table(auxTableNameFor("target")).schema
assert(auxSchema.fieldNames.toSeq ==
Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+ assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id"))
}
test("if the AutoCDC auxiliary table is dropped between runs, it is
transparently " +
@@ -238,4 +241,87 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
)
}
+ test("auxiliary key-column-names property survives identifiers containing
special " +
+ "characters that exercise both JSON and SQL string-literal escaping") {
+ val session = spark
+ import session.implicits._
+
+ // This test exercises the full identifier-text persistence path with
composite keys whose
+ // names collectively cover every escape class:
+ // - `it's` -- single quote: not escaped by JSON; the
writer must double it
+ // to `''` to keep the SQL TBLPROPERTIES
literal well-formed.
+ // - `name with spaces` -- whitespace identifier: backtick-quoted in
DDL, no escaping
+ // needed in the JSON or the property value.
+ // - `a"b` -- literal double quote: JSON escapes as `\"`.
+ // - `c\d` -- literal backslash: JSON escapes as `\\`.
+ // If any layer drops, splits, or misescapes a name, the post-run lookup
of the
+ // [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] property either fails
to read or
+ // returns a value that is no longer a parseable JSON array of strings.
+ val keyNames = Seq("it's", "name with spaces", "a\"b", "c\\d")
+
+ // SQL DDL identifier rendering: backticks delimit each identifier; an
embedded backtick
+ // would have to be escaped by doubling, but none of these names contain
one.
+ val targetTableDdl = keyNames
+ .map(name => s"`$name` STRING NOT NULL")
+ .mkString(", ") + s", version BIGINT NOT NULL, $cdcMetadataDdl"
+ spark.sql(s"CREATE TABLE $catalog.$namespace.target ($targetTableDdl)")
+
+ // The AutoCDC API runs every key through `UnqualifiedColumnName.apply`,
which calls
+ // `CatalystSqlParser.parseMultipartIdentifier`. To get a single-part
identifier whose
+ // text includes special characters, the API caller has to backtick-quote
at the boundary;
+ // we mirror that here by wrapping each name in backticks (and doubling
any embedded
+ // backtick -- not needed for these names but kept for parity with how a
user would call
+ // the API).
+ val backtickQuotedKeys = keyNames.map(name => s"`${name.replace("`",
"``")}`")
+
+ // Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
+ val stream = MemoryStream[(String, String, String, String, Long)]
+ def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(
+ stream.toDF().toDF((keyNames :+ "version"): _*)
+ ),
+ keys = backtickQuotedKeys,
+ sequencing = functions.col("version")
+ ))
+ }
+
+ // Run #1: a single insert with arbitrary non-empty key values.
+ stream.addData(("v1", "v2", "v3", "v4", 1L))
+ runPipeline(buildCtx())
+
+ // The persisted property must round-trip every name byte-for-byte.
+ assert(getAuxTableKeyColumnNames(target = "target") == keyNames)
+
+ // Run #2: same keys, a higher sequence -- drift validation reads the
property back, parses
+ // the JSON, and looks up each recorded name in the aux schema. If any
layer mangled the
+ // identifier text (lost an escape, dropped a `'`, split on a `.`, ...),
validation would
+ // either throw KEY_SCHEMA_DRIFT (name lookup miss) or INTERNAL_ERROR
(recorded name absent
+ // from aux schema). Reaching the second run successfully proves the
round-trip works.
+ stream.addData(("v1", "v2", "v3", "v4", 2L))
+ runPipeline(buildCtx())
+
+ // The persisted property is immutable across non-full-refresh runs, so it
must still be
+ // intact after run #2.
+ assert(getAuxTableKeyColumnNames(target = "target") == keyNames)
+ }
+
+ private def getAuxTableKeyColumnNames(target: String): Seq[String] = {
+ val auxName = auxTableNameFor(target)
+ val rows = spark.sql(s"SHOW TBLPROPERTIES $auxName").collect()
+ val prop = rows
+ .find(_.getString(0) == AutoCdcAuxiliaryTable.keyColumnNamesProperty)
+ .getOrElse(fail(
+ s"auxiliary table $auxName is missing the " +
+ s"${AutoCdcAuxiliaryTable.keyColumnNamesProperty} property; got:
${rows.toSeq}"
+ ))
+ AutoCdcAuxiliaryTable.parseKeyColumnNames(prop.getString(1))
+ .getOrElse(fail(
+ s"auxiliary table $auxName has a malformed " +
+ s"${AutoCdcAuxiliaryTable.keyColumnNamesProperty} property:
'${prop.getString(1)}'"
+ ))
+ }
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
new file mode 100644
index 000000000000..066d8afd5342
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC
flow's declared
+ * keys are validated against the auxiliary table's recorded keys at flow
execution-init
+ * time. A change in keys across runs without a full refresh corrupts the
merge semantics
+ * (rows mis-routed between insert/update); validation detects this and fails
fast with a
+ * structured [[AUTOCDC_INVALID_STATE]] error.
+ *
+ * Each test seeds the auxiliary table by running a first pipeline with one
set of keys, then
+ * runs a second pipeline with a different shape (new keys, dropped keys,
swapped keys, drifted
+ * dataType, or with a tampered auxiliary table) and asserts on the structured
failure.
+ */
+class AutoCdcScd1KeyDriftSuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("a pipeline execution that adds a key column to an existing AutoCDC
flow triggers " +
+ "KEY_SCHEMA_DRIFT") {
+ val session = spark
+ import session.implicits._
+
+ // Target table carries both candidate key columns up-front so only the
AutoCDC `keys`
+ // declaration differs between the two pipelines.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, region STRING NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1 declares one key (`id`). Aux table is created with schema
(id, _cdc_metadata).
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "us", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "region", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Pipeline #2 declares two keys (`region` + `id`) - arity drift.
+ val stream2 = MemoryStream[(Int, String, Long)]
+ stream2.addData((1, "us", 2L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "region", "version")),
+ keys = Seq("region", "id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow_v2", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ // `region` is nullable here because Scala `String` is a reference
type and the
+ // [[MemoryStream]] tuple encoder treats reference types as nullable.
Only Scala
+ // primitives (`Int`, `Long`, ...) yield `NOT NULL` columns.
+ "expectedKeySchema" -> "region STRING,id INT NOT NULL",
+ "recordedKeySchema" -> "id INT NOT NULL"
+ )
+ )
+ }
+
+ test("a pipeline execution that drops a key column from an existing AutoCDC
flow triggers " +
+ "KEY_SCHEMA_DRIFT") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1 declares two keys [region, id]. Without strict-equality,
the dropped `region`
+ // would slip through with `id` silently matching at position 0 of the
recorded schema.
+ val stream1 = MemoryStream[(String, Int, Long)]
+ stream1.addData(("us", 1, 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("region", "id", "version")),
+ keys = Seq("region", "id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Pipeline #2 declares only [id] - arity drift.
+ val stream2 = MemoryStream[(String, Int, Long)]
+ stream2.addData(("us", 1, 2L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("region", "id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow_v2", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "expectedKeySchema" -> "id INT NOT NULL",
+ // `region` is nullable here because Scala `String` is a reference
type; see the
+ // analogous comment in the "adds a key column" test above.
+ "recordedKeySchema" -> "region STRING,id INT NOT NULL"
+ )
+ )
+ }
+
+ test("a pipeline execution that swaps a key in an existing AutoCDC flow for
a different name " +
+ "(same arity) triggers KEY_SCHEMA_DRIFT") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " +
+ s"version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ // Pipeline #1 declares [id, region].
+ val stream1 = MemoryStream[(Int, String, String, Long)]
+ stream1.addData((1, "us", "USA", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "region", "country",
"version")),
+ keys = Seq("id", "region"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Pipeline #2 declares [id, country] - same arity, different key set. An
arity-only check
+ // would silently match `id` at position 0 and the swapped
`region`/`country` would slip
+ // through; the by-name set comparison must catch it.
+ val stream2 = MemoryStream[(Int, String, String, Long)]
+ stream2.addData((1, "us", "USA", 2L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "region", "country",
"version")),
+ keys = Seq("id", "country"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow_v2", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ // `country` and `region` are nullable here because Scala `String` is
a reference type;
+ // see the analogous comment in the "adds a key column" test above.
+ "expectedKeySchema" -> "id INT NOT NULL,country STRING",
+ "recordedKeySchema" -> "id INT NOT NULL,region STRING"
+ )
+ )
+ }
+
+ test("a pipeline whose recorded aux key dataType differs from the flow's
source dataType " +
+ "triggers KEY_SCHEMA_DRIFT") {
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"""CREATE TABLE ${auxTableNameFor("target")} (id BIGINT NOT NULL,
$cdcMetadataDdl) """ +
+ s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'["id"]')"""
+ )
+
+ val session = spark
+ import session.implicits._
+ val stream = MemoryStream[(Int, Long)]
+ stream.addData((1, 1L))
+ val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "expectedKeySchema" -> "id INT NOT NULL",
+ "recordedKeySchema" -> "id BIGINT NOT NULL"
+ )
+ )
+ }
+
+ test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift
validation") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1 declares keys [a, b] (in that order). Drift validation is
order-independent:
+ // the recorded ordering is purely cosmetic for human-readable error
messages and must not
+ // gate semantic equivalence, since the merge semantics depend only on the
*set* of key
+ // columns and their dataTypes.
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "x", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("a", "b", "version")),
+ keys = Seq("a", "b"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Pipeline #2 declares the same key set in the reversed order [b, a].
Must NOT throw.
+ val stream2 = MemoryStream[(Int, String, Long)]
+ stream2.addData((2, "y", 1L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("a", "b", "version")),
+ keys = Seq("b", "a"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx2)
+ }
+
+ test("a pipeline execution that changes a key column's nullability or
metadata in an " +
+ "existing AutoCDC flow does NOT trigger drift") {
+ val session = spark
+ import session.implicits._
+
+ // Drift validation compares (name, dataType) pairs as a set. Nullability
and column
+ // metadata are part of [[StructField]] but not part of [[DataType]], so
they do not gate
+ // semantic equivalence: only the wire-format data type matters for merge
correctness.
+ // Target's `id` is nullable so the second pipeline's nullable-`id` source
is accepted.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ // Pipeline #1: source carries `id INT NOT NULL` (Scala primitive `Int`),
no metadata.
+ val stream1 = MemoryStream[(Int, Long)]
+ stream1.addData((1, 1L))
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"),
Seq("id")))
+
+ // Pipeline #2: source carries `id INT` (nullable, via `Option[Int]`) AND
attaches
+ // non-empty column metadata. Same name and `dataType` as the recorded
key, but every
+ // [[StructField]] aspect outside `dataType` differs.
+ val stream2 = MemoryStream[(Option[Int], Long)]
+ stream2.addData((Some(2), 2L))
+ val baseDf = stream2.toDF().toDF("id", "version")
+ val md = new org.apache.spark.sql.types.MetadataBuilder()
+ .putString("description", "primary key")
+ .build()
+ val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md),
baseDf("version"))
+ runPipeline(buildPipeline("flow_v2", sourceDfWithMetadata, Seq("id")))
+ }
+
+ test("a pipeline execution that wraps an existing AutoCDC flow's key in
backticks does NOT " +
+ "trigger drift") {
+ val session = spark
+ import session.implicits._
+
+ // Backticks are a SQL-parse syntactic device, not part of the identifier
itself. A user
+ // adding or removing backticks around the same logical column must NOT be
detected as drift.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long)]
+ stream1.addData((1, 1L))
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"),
Seq("id")))
+
+ val stream2 = MemoryStream[(Int, Long)]
+ stream2.addData((2, 1L))
+ runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"),
Seq("`id`")))
+ }
+
+ test("a pipeline execution that drops backticks around an existing AutoCDC
flow's " +
+ "previously-backtick-quoted key does NOT trigger drift") {
+ val session = spark
+ import session.implicits._
+
+ // The reverse direction of the previous test: drift validation must be
backtick-invariant
+ // on both the WRITE side (recorded property strips backticks when
serializing the key
+ // names in pipeline #1) and the READ side (resolver-aware lookup strips
backticks when
+ // pipeline #2's expected keys are matched against the recorded set).
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long)]
+ stream1.addData((1, 1L))
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"),
Seq("`id`")))
+
+ val stream2 = MemoryStream[(Int, Long)]
+ stream2.addData((2, 1L))
+ runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"),
Seq("id")))
+ }
+
+ test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key
differs only in case " +
+ "from the recorded key triggers KEY_SCHEMA_DRIFT") {
+ val session = spark
+ import session.implicits._
+
+ // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its
behavior on
+ // `Id` vs `id` flips with the session conf. Pin the case-sensitive
direction: pipeline #1
+ // seeds the aux table under the default resolver with recorded key
`["id"]`, then
+ // pipeline #2 runs under the case-sensitive resolver with key `["Id"]`.
Because `Id` and
+ // `id` are distinct identifiers under that resolver, drift validation
must fail.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long)]
+ stream1.addData((1, 1L))
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"),
Seq("id")))
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ // Source DF column is `Id` (capital) so the AutoCDC flow's own
key-presence check
+ // (`requireKeysPresentInSelectedSchema`) succeeds under case-sensitive
analysis.
+ // Drift validation is then the only remaining failure mode and it must
fire.
+ val stream2 = MemoryStream[(Int, Long)]
+ stream2.addData((1, 2L))
+ val ctx2 = buildPipeline("flow_v2", stream2.toDF().toDF("Id",
"version"), Seq("Id"))
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow_v2", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "expectedKeySchema" -> "Id INT NOT NULL",
+ "recordedKeySchema" -> "id INT NOT NULL"
+ )
+ )
+ }
+ }
+
+ test("under the default (case-insensitive) resolver, an AutoCDC flow whose
key differs only " +
+ "in case from the recorded key does NOT trigger drift") {
+ val session = spark
+ import session.implicits._
+
+ // Pairs with the case-sensitive test above: same recorded key, but under
the default
+ // resolver the two identifiers are equivalent so drift validation must
accept pipeline
+ // #2. This pins the negative direction so a regression that accidentally
hard-codes a
+ // case-sensitive resolver in the validator is caught.
+ //
+ // Note that only the *key declaration* (`Seq("Id")`) has different casing
here -- the
+ // source DF column name still matches the target's `id` exactly.
Differing the source DF
+ // column casing as well would not exercise drift:
[[SchemaMergingUtils.mergeSchemas]] is
+ // case-sensitive on column names and would add `Id` as a new column to
the target,
+ // producing AMBIGUOUS_REFERENCE during the streaming write rather than
letting drift
+ // validation make the call.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long)]
+ stream1.addData((1, 1L))
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"),
Seq("id")))
+
+ val stream2 = MemoryStream[(Int, Long)]
+ stream2.addData((1, 2L))
+ runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"),
Seq("Id")))
+ }
+
+ test("a pipeline whose aux table is missing the keyColumnNames property
fails with " +
+ "AUXILIARY_TABLE_PROPERTY_MISSING") {
+ // Pre-create the aux table directly without the
[[keyColumnNamesProperty]] to simulate
+ // corrupt metadata (e.g. user ran `ALTER TABLE ... UNSET TBLPROPERTIES`).
Validation must
+ // surface a structured AUTOCDC_INVALID_STATE error rather than silently
mis-validating keys.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val session = spark
+ import session.implicits._
+ val stream = MemoryStream[(Int, Long)]
+ stream.addData((1, 1L))
+ val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MISSING",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+ )
+ )
+ }
+
+ test("a pipeline whose aux table has a malformed keyColumnNames property
fails with " +
+ "AUXILIARY_TABLE_PROPERTY_MALFORMED") {
+ // Pre-create the aux table directly with a non-JSON-array property value
to simulate
+ // corrupt metadata. Validation must surface a structured
AUTOCDC_INVALID_STATE error
+ // rather than letting a parse exception leak.
+ val malformedKeysArray = "not-a-json-array"
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL,
$cdcMetadataDdl) " +
+ s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'$malformedKeysArray')"
+ )
+
+ val session = spark
+ import session.implicits._
+ val stream = MemoryStream[(Int, Long)]
+ stream.addData((1, 1L))
+ val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MALFORMED",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty,
+ "rawValue" -> malformedKeysArray
+ )
+ )
+ }
+
+ test("a pipeline whose aux table records a key absent from its schema fails
with " +
+ "AUXILIARY_TABLE_KEY_COLUMN_MISSING") {
+ // Pre-create the aux table directly with the [[keyColumnNamesProperty]]
pointing at a
+ // column that does not exist in the aux schema. This is either a
write-path implementation
+ // bug or external user tampering (e.g. dropping the key column);
validation must surface a
+ // structured AUTOCDC_INVALID_STATE error rather than KEY_SCHEMA_DRIFT,
because the drift
+ // validator cannot run without resolving every recorded key first.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"""CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL,
$cdcMetadataDdl) """ +
+ s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'["region"]')"""
+ )
+
+ val session = spark
+ import session.implicits._
+ val stream = MemoryStream[(Int, Long)]
+ stream.addData((1, 1L))
+ val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_KEY_COLUMN_MISSING",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("target"),
+ "keyColumnName" -> "region",
+ "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+ )
+ )
+ }
+
+ /**
+ * Build a single-flow pipeline targeting `cat.ns1.target` with the given
source DF and key
+ * column list.
+ */
+ private def buildPipeline(
+ flowName: String,
+ sourceDf: org.apache.spark.sql.classic.DataFrame,
+ keys: Seq[String]): TestGraphRegistrationContext = {
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = flowName,
+ target = "target",
+ query = dfFlowFunc(sourceDf),
+ keys = keys,
+ sequencing = functions.col("version")
+ ))
+ }
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
index 32f34923c480..0d3f6e954df3 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -293,4 +293,65 @@ class AutoCdcScd1MultiPipelineSuite
)
}
+ test("a second pipeline targeting an existing AutoCDC table with different
keys " +
+ "fails with KEY_SCHEMA_DRIFT") {
+ val session = spark
+ import session.implicits._
+
+ // Target table with both candidate keys present so the second pipeline
would otherwise
+ // be schema-compatible with the first; only the AutoCDC `keys` differ
between flows.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.shared_target " +
+ s"(id INT NOT NULL, name STRING NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1: AutoCDC flow keyed on `id`. Materializes the auxiliary
table with schema
+ // (id, _cdc_metadata).
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "alice", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "shared_target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Pipeline #2: completely separate graph, but targets the same physical
`shared_target`
+ // table with `keys = Seq("name")`.
+ val stream2 = MemoryStream[(Int, String, Long)]
+ stream2.addData((2, "alice", 1L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "shared_target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+ keys = Seq("name"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+ sqlState = Some("42000"),
+ parameters = Map(
+ "flowName" ->
+ fullyQualifiedIdentifier("flow_v2", Some(catalog),
Some(namespace)).unquotedString,
+ "auxTableName" -> auxTableNameFor("shared_target"),
+ // Pipeline #2's AutoCDC key resolves from the source DF, where
`MemoryStream[(Int, String,
+ // Long)]` produces a nullable StringType for `name`.
+ "expectedKeySchema" -> "name STRING",
+ // Pipeline #1 persisted the aux table from a source DF whose `id` was
a non-null Scala
+ // primitive (`Int`), so the recorded key carries `NOT NULL`.
+ "recordedKeySchema" -> "id INT NOT NULL"
+ )
+ )
+ }
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
index 4c20b21ad57a..2424dbdc4e05 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -31,11 +31,10 @@ import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistratio
import org.apache.spark.sql.test.SharedSparkSession
/**
- * Tests covering AutoCDC's interaction with schema evolution across pipeline
runs. The
- * suite documents the supported additive cases (new top-level columns, new
nested fields
- * in array-of-struct, broadening / narrowing column selection) and the cases
that fail
- * loudly today (subtractive nested evolution, type-incompatible changes,
case-only
- * renames).
+ * Tests covering AutoCDC's interaction with non-key schema evolution across
pipeline runs. The
+ * suite documents the supported additive cases (new top-level columns, new
nested fields in
+ * array-of-struct, broadening / narrowing column selection) and the cases
that fail loudly
+ * today (subtractive nested evolution, type-incompatible changes, case-only
renames).
*
* These behaviors are largely inherited from the lower layers
(`SchemaMergingUtils` for
* schema merge, the v2 writer's column-resolution layer for nested-field
handling) rather
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]