This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 7ed7fe52f43f [SPARK-56957][SDP] AutoCDC Flow Execution; Introduce and
Integrate SCD1 `Scd1MergeStreamingWrite`
7ed7fe52f43f is described below
commit 7ed7fe52f43f4faa05238bb85a3799d07ddfd05f
Author: AnishMahto <[email protected]>
AuthorDate: Wed May 27 22:50:38 2026 +0800
[SPARK-56957][SDP] AutoCDC Flow Execution; Introduce and Integrate SCD1
`Scd1MergeStreamingWrite`
### What changes were proposed in this pull request?
In order for a pipeline to actually execute an AutoCDC SCD1 flow, the SDP
engine needs to have a "physical" flow definition that defines what streaming
transformation must be done for SCD1, and how to construct this physical flow
given a "logical" flow.
The `FlowPlanner` is responsible for converting a resolved SCD1 streaming
logical flow into the SCD1 streaming physical flow. The physical flow
implements the SCD1 `foreachBatch` streaming query on the flow input.
Integration of physical flow unblocks pipeline execution with AutoCDC
flows, which means we need to also fill gaps for auxiliary/target table
management, schema evolution, inter-pipeline validation, etc.
One validation these changes intentionally do not include is validating
against a changing key-set across pipeline invocations - that requires more
design and will be handled in a separate PR.
### Why are the changes needed?
To actually execute AutoCDC SCD1 flow transformation with an SDP pipeline.
Before this point if an AutoCDC flow was registered with the graph, the graph
analysis engine would throw an unrecognized flow exception.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
With these changes, we can now actually test how an AutoCDC flow interacts
with the rest of SDP. This unlocks a number of features/integrations that we
should test, such as:
- Schema evolution
- Writing to different pipeline dataset types
- Full refresh semantics
- Multiflow, multipipeline, for the same AutoCDC target
- Executing AutoCDC flows over multiple independent pipeline runs
- etc.
As such I added a new `AutoCdcGraphExecutionTestMixin` providing a
v2-row-level-ops-capable
catalog (`SharedTablesInMemoryRowLevelOperationTableCatalog`) and the
standard
fixtures all AutoCDC E2E suites share, plus six new end-to-end suites
(~30 tests) covering:
- `AutoCdcScd1SinglePipelineSuite` — basic upsert/delete reconciliation in
a single pipeline, plus the `AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE` failure
path.
- `AutoCdcScd1MultiPipelineSuite` — independent targets across pipelines,
downstream readers of an AutoCDC target.
- `AutoCdcScd1FullRefreshSuite` — full refresh wipes target rows + aux
table; sequence comparisons reset; selective refresh isolates state.
- `AutoCdcScd1SchemaEvolutionSuite` — broadening/narrowing column
selection, nullable column addition, type widening/narrowing, nested
struct/array evolution, case-only collisions, etc.
- `AutoCdcScd1AuxiliaryTableDurabilitySuite` — keys-first invariant,
declared key order preserved, multi-run sequence comparisons, transparent
aux-table recreation if dropped.
- `AutoCdcScd1TargetTableDurabilitySuite` — pre-loaded target rows,
late-added CDC metadata column.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh
Closes #56122 from
AnishMahto/SPARK-56957-implement-SCD1-execution-streaming-flow.
Lead-authored-by: AnishMahto <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7632d37a50ef725fe1d622ca50975d5ec71d545e)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +
python/pyspark/pipelines/api.py | 5 +
.../pipelines/autocdc/AutoCdcReservedNames.scala | 32 +
.../spark/sql/pipelines/autocdc/ChangeArgs.scala | 16 +-
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 47 +-
.../spark/sql/pipelines/graph/DatasetManager.scala | 14 +
.../apache/spark/sql/pipelines/graph/Flow.scala | 5 +-
.../spark/sql/pipelines/graph/FlowExecution.scala | 208 ++++++
.../spark/sql/pipelines/graph/FlowPlanner.scala | 37 +-
.../sql/pipelines/autocdc/AutoCdcFlowSuite.scala | 14 +-
.../graph/AutoCdcGraphExecutionTestMixin.scala | 213 ++++++
.../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala | 241 +++++++
.../graph/AutoCdcScd1FullRefreshSuite.scala | 245 +++++++
.../graph/AutoCdcScd1MultiPipelineSuite.scala | 296 +++++++++
.../graph/AutoCdcScd1SchemaEvolutionSuite.scala | 733 +++++++++++++++++++++
.../graph/AutoCdcScd1SinglePipelineSuite.scala | 216 ++++++
.../AutoCdcScd1TargetTableDurabilitySuite.scala | 159 +++++
.../graph/ConnectInvalidPipelineSuite.scala | 2 +-
18 files changed, 2455 insertions(+), 34 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index b792afc47b57..1c03cea7fcd3 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -256,6 +256,12 @@
],
"sqlState" : "0A000"
},
+ "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE" : {
+ "message" : [
+ "Cannot start AutoCDC flow: the target table <tableName> (format:
<format>) does not support row-level operations. AutoCDC requires a target
backed by a connector that supports MERGE."
+ ],
+ "sqlState" : "0A000"
+ },
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data
type <dataType>.",
diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py
index 578b28ec3793..084547f4c2b1 100644
--- a/python/pyspark/pipelines/api.py
+++ b/python/pyspark/pipelines/api.py
@@ -556,6 +556,11 @@ def create_auto_cdc_flow(
Note that for keys, sequence_by, column_list, and except_column_list the
arguments have to
be column identifiers without qualifiers, e.g. they cannot be
col("sourceTable.keyId").
+ The set and types of `keys` are part of the Auto CDC flow's persisted
state. Changing keys
+ across incremental runs (renaming, swapping, growing, shrinking, or
changing the type of a
+ key column) is not supported and will produce undefined behavior. To
change the key set,
+ fully refresh the target table.
+
:param target: The name of the target table that receives the Auto CDC
flow.
:param source: The name of the CDC source to stream from.
:param keys: The column or combination of columns that uniquely identify a
row in the source \
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
new file mode 100644
index 000000000000..2b0f8e293e76
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+/**
+ * Names that AutoCDC reserves for its own use, both for internal columns it
inserts during
+ * reconciliation (e.g. `${prefix}metadata`, `${prefix}winning_row`) and for
internal tables it
+ * manages alongside user-defined targets (e.g. the per-target auxiliary state
table).
+ *
+ * A single recognizable prefix gives a single auditable answer to "what does
AutoCDC own", and
+ * lets user-defined columns and tables be unambiguously distinguished from
AutoCDC-managed ones.
+ */
+private[pipelines] object AutoCdcReservedNames {
+
+ /** Common reserved-name prefix shared by AutoCDC internal columns and
internal tables. */
+ val prefix: String = "__spark_autocdc_"
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index b975e06807f5..c475377ba506 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -129,13 +129,23 @@ private[pipelines] object CaseSensitivityLabels {
}
/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
-sealed trait ScdType
+sealed trait ScdType {
+ /**
+ * Short, stable label for this SCD type. Persisted as table property on
AutoCDC flow auxiliary
+ * tables.
+ */
+ def label: String
+}
object ScdType {
/** Representation for the standard SCD1 strategy. */
- case object Type1 extends ScdType
+ case object Type1 extends ScdType {
+ override val label: String = "SCD1"
+ }
/** Representation for the standard SCD2 strategy. */
- case object Type2 extends ScdType
+ case object Type2 extends ScdType {
+ override val label: String = "SCD2"
+ }
}
/**
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index aaea3b63e9ef..0035f442fb00 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -367,19 +367,29 @@ case class Scd1BatchProcessor(
val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
microbatchDeleteVersionField > destinationUpsertVersionField
- // When the incoming upsert wins against an existing record, the entire
row (all columns)
- // will be overwritten, including the CDC metadata column. We only exclude
keys because
- // most merge implementations require that join columns are not being
mutated, even if
- // the mutation is a no-op.
val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
val keyNames = changeArgs.keys.map(_.name)
+
+ def constructTargetColumnAssignmentsFromMicrobatch(columnName: String):
(String, Column) = {
+ // Map a column in the target table to its direct equivalent in the
microbatch. Note that
+ // because of target-table schema evolution during SDP dataset
materialization, the
+ // microbatch's columns are always a subset of (or equal to) the
target's columns.
+ val quotedCol = QuotingUtils.quoteIdentifier(columnName)
+ s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
+ }
+
+ // Most merge implementations require that join columns are not mutated,
even when the
+ // mutation would be a no-op. The remaining microbatch columns (including
the CDC metadata
+ // column) are overwritten outright when the incoming upsert wins.
val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
microbatchDf.columns
.filterNot(c => keyNames.exists(resolver(_, c)))
- .map { c =>
- val quotedCol = QuotingUtils.quoteIdentifier(c)
- s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
- }
+ .map(constructTargetColumnAssignmentsFromMicrobatch)
+ .toMap
+
+ val columnsToInsertOnNewKey: Map[String, Column] =
+ microbatchDf.columns
+ .map(constructTargetColumnAssignmentsFromMicrobatch)
.toMap
microbatchDf
@@ -391,7 +401,12 @@ case class Scd1BatchProcessor(
// New key: only insert upserts; deletes for absent keys are no-ops for
the target table
// merge, and instead would have been inserted as tombstones into the
auxiliary table.
.whenNotMatched(microbatchDeleteVersionField.isNull)
- .insertAll()
+ // When inserting a brand new row for a new key, construct column
mappings from microbatch.
+ // The microbatch's columns may be a strict subset of the target's
columns -- e.g. the user
+ // narrowed `column_list` between runs, or the source DF dropped a
column. The target's
+ // columns can never be a strict subset of the microbatch's, however,
because SDP's schema
+ // evolution always unions old and new schemas onto the target.
+ .insert(columnsToInsertOnNewKey)
.merge()
}
@@ -417,17 +432,15 @@ case class Scd1BatchProcessor(
object Scd1BatchProcessor {
/**
- * Reserved column-name prefix for internal SDP AutoCDC processing. Source
change-data-feed
- * dataframes must not contain any columns starting with this prefix; the
invariant is
+ * Internal columns inserted by AutoCDC reconciliation. Source
change-data-feed dataframes must
+ * not contain any columns starting with [[AutoCdcReservedNames.prefix]];
the invariant is
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]]
construction.
*/
- private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"
-
- private[autocdc] val winningRowColName: String =
s"${reservedColumnNamePrefix}winning_row"
- private[pipelines] val cdcMetadataColName: String =
s"${reservedColumnNamePrefix}metadata"
+ private[autocdc] val winningRowColName: String =
s"${AutoCdcReservedNames.prefix}winning_row"
+ private[pipelines] val cdcMetadataColName: String =
s"${AutoCdcReservedNames.prefix}metadata"
- private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
- private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
+ private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
+ private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"
/** Project the delete sequence out of the CDC metadata column. */
private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
index 4affbe4637db..456edca8d1e2 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
@@ -303,6 +303,20 @@ object DatasetManager extends Logging {
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
}
+ if (isFullRefresh) {
+ // On full refresh, drop the AutoCDC auxiliary state associated with
this table (if any) so
+ // that stale delete-tracking data and table properties are not carried
forward into the new
+ // table generation. We unconditionally issue the DROP for every
fully-refreshed target.
+
+ // Intentionally DROP and not TRUNCATE: the auxiliary table is an
internal state store
+ // that is not part of the dataflow graph, so it does not participate in
regular schema
+ // evolution like user tables do. On a full refresh we want a clean
recreation against
+ // the new target schema rather than carrying forward the previous
generation's layout.
+
+ val auxiliaryTableId = AutoCdcAuxiliaryTable.identifier(table.identifier)
+ context.spark.sql(s"DROP TABLE IF EXISTS
${auxiliaryTableId.quotedString}")
+ }
+
// Alter the table if we need to
existingTableOpt.foreach { existingTable =>
val existingSchema = v2ColumnsToStructType(existingTable.columns())
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index 04ef8d3186c5..9f357ef026b0 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier,
TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.pipelines.autocdc.{
+ AutoCdcReservedNames,
CaseSensitivityLabels,
ChangeArgs,
ColumnSelection,
@@ -271,7 +272,7 @@ class AutoCdcMergeFlow(
}
/** The DataType of the sequencing expression, derived once from the source
change feed. */
- private val sequencingType: DataType =
+ private[graph] val sequencingType: DataType =
df.select(changeArgs.sequencing).schema.head.dataType
/**
@@ -335,7 +336,7 @@ class AutoCdcMergeFlow(
*/
private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
val resolver = spark.sessionState.conf.resolver
- val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix
+ val reservedPrefix = AutoCdcReservedNames.prefix
def nameContainsReservedPrefix(name: String): Boolean = {
name.length >= reservedPrefix.length && resolver(
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index 13a5621947d5..ea151830f544 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -23,12 +23,24 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
+import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.classic.SparkSession
+import org.apache.spark.sql.connector.catalog.{Identifier,
SupportsRowLevelOperations, TableCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.pipelines.autocdc.{
+ AutoCdcReservedNames,
+ ChangeArgs,
+ Scd1BatchProcessor,
+ Scd1ForeachBatchHandler
+}
import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
import org.apache.spark.sql.pipelines.util.SparkSessionUtils
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ThreadUtils
/**
@@ -301,3 +313,199 @@ class SinkWrite(
.start()
}
}
+
+object AutoCdcAuxiliaryTable {
+ /**
+ * Helper for deriving the auxiliary AutoCDC catalog table identifier from a
target table. If a
+ * table exists with a name matching the name derived here, it is assumed to
be an AutoCDC
+ * auxiliary table that should be managed by the pipeline.
+ */
+ def identifier(destination: TableIdentifier): TableIdentifier =
TableIdentifier(
+ table = s"${AutoCdcReservedNames.prefix}aux_state_${destination.table}",
+ database = destination.database,
+ catalog = destination.catalog
+ )
+
+ /**
+ * Reserved table property key set on the auxiliary table to record which
SCD strategy it
+ * serves.
+ */
+ val scdTypePropertyKey: String =
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scd_type"
+}
+
+/**
+ * Base trait for AutoCDC merge-based write flows.
+ */
+trait AutoCdcMergeWriteBase {
+ /** The spark session the AutoCDC flow is going to be planned in. */
+ protected def spark: SparkSession
+
+ /** The destination (target) table entity the AutoCDC flow will be writing
to. */
+ protected def destination: Table
+
+ /** The AutoCDC flow's [[ChangeArgs]] (keys, sequencing, columnSelection,
...). */
+ protected def changeArgs: ChangeArgs
+
+ /** Full schema of the auxiliary table for this SCD type. */
+ protected def auxiliaryTableSchema: StructType
+
+ /**
+ * Idempotently create the auxiliary table for [[destination]] if it does
not already exist
+ * and return its [[TableIdentifier]].
+ *
+ * Note that this is `CREATE TABLE IF NOT EXISTS`: when the aux table
already exists, its
+ * schema is left untouched and `auxiliaryTableSchema` is ignored. For SCD1,
they keys must be
+ * invariant across executions and the CDC metadata will always be present,
so this is correct.
+ */
+ protected def createAuxiliaryTableIfNotExists(spark: SparkSession):
TableIdentifier = {
+ val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
+ // The auxiliary table inherits the target's format so MERGE semantics
line up. When the
+ // target's format is unspecified (None), omit the USING clause and fall
back to the
+ // session's default source provider.
+ val usingClause = destination.format.map(fmt => s"USING
$fmt").getOrElse("")
+ val tblPropertiesClause =
+ s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.scdTypePropertyKey}' = " +
+ s"'${changeArgs.storedAsScdType.label}')"
+ spark.sql(
+ s"""CREATE TABLE IF NOT EXISTS
+ |${auxIdent.quotedString}
+ |(${auxiliaryTableSchema.toDDL}) $usingClause
$tblPropertiesClause""".stripMargin
+ )
+ auxIdent
+ }
+
+ /**
+ * Validate that the target table's underlying connector implements
+ * [[SupportsRowLevelOperations]], which is the V2 connector contract for
MERGE/UPDATE/DELETE
+ * with rewrite - all operations that the AutoCDC transformation executes.
+ */
+ protected def requireDestinationSupportsRowLevelOps(): Unit = {
+ val (catalog, v2Identifier) = resolveTableCatalog(spark,
destination.identifier)
+ val destinationTable = catalog.loadTable(v2Identifier)
+
+ if (!destinationTable.isInstanceOf[SupportsRowLevelOperations]) {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE",
+ messageParameters = Map(
+ "tableName" -> destination.identifier.quotedString,
+ "format" -> destination.format.orElse(
+ Option(
+ destinationTable.properties.get(TableCatalog.PROP_PROVIDER)
+ )
+ )
+ .getOrElse("<unknown>")
+ )
+ )
+ }
+ }
+
+ private def resolveTableCatalog(
+ spark: SparkSession,
+ ident: TableIdentifier): (TableCatalog, Identifier) = {
+ val catalogManager = spark.sessionState.catalogManager
+ val catalogPlugin = ident.catalog
+ .map(catalogManager.catalog)
+ .getOrElse(catalogManager.currentCatalog)
+ val catalog = catalogPlugin match {
+ case t: TableCatalog => t
+ case _ => throw
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+ }
+ val namespace = ident.database.getOrElse(
+ throw SparkException.internalError(
+ s"Cannot resolve table identifier ${ident.quotedString}: namespace is
unspecified."
+ )
+ )
+ (catalog, Identifier.of(Array(namespace), ident.table))
+ }
+}
+
+/**
+ * A [[StreamingFlowExecution]] that applies a CDC event stream to a target
[[Table]] via
+ * SCD Type 1 MERGE semantics.
+ */
+class Scd1MergeStreamingWrite(
+ val identifier: TableIdentifier,
+ val flow: AutoCdcMergeFlow,
+ val graph: DataflowGraph,
+ val updateContext: PipelineUpdateContext,
+ val checkpointPath: String,
+ val trigger: Trigger,
+ val destination: Table,
+ val sqlConf: Map[String, String]
+) extends StreamingFlowExecution with AutoCdcMergeWriteBase {
+
+ requireDestinationSupportsRowLevelOps()
+
+ override def getOrigin: QueryOrigin = flow.origin
+
+ override protected def changeArgs: ChangeArgs = flow.changeArgs
+
+ override def startStream(): StreamingQuery = {
+ val sourceChangeDataFeed = graph.reanalyzeFlow(flow).df
+
+ // The auxiliary table is created here (at flow execution) rather than
during flow resolution
+ // or dataset materialization for two reasons:
+ // 1. It is an internal state store: we deliberately keep it out of the
graph registration
+ // context's table set so that it is invisible to other flows and the
[[DatasetManager]]
+ // will never materialize it.
+ // 2. Its format must match the target table's, which only exists after
the target is
+ // materialized. Flow resolution must also stay side-effect free
(e.g. for dry runs).
+ val auxiliaryTableIdentifier = createAuxiliaryTableIfNotExists(spark =
updateContext.spark)
+
+ val foreachBatchHandler = Scd1ForeachBatchHandler(
+ batchProcessor = Scd1BatchProcessor(
+ changeArgs = flow.changeArgs,
+ resolvedSequencingType = flow.sequencingType
+ ),
+ auxiliaryTableIdentifier = auxiliaryTableIdentifier,
+ targetTableIdentifier = destination.identifier
+ )
+
+ sourceChangeDataFeed.writeStream
+ .queryName(displayName)
+ .option("checkpointLocation", checkpointPath)
+ .trigger(trigger)
+ .foreachBatch((batch: Dataset[Row], batchId: Long) => {
+ foreachBatchHandler.execute(batch, batchId)
+ })
+ .start()
+ }
+
+ override protected lazy val auxiliaryTableSchema: StructType =
+ // SCD1's auxiliary table is just keys + the CDC metadata struct; no user
data columns. Keys
+ // come first, in `changeArgs.keys` declaration order, to anchor the
per-key sequence
+ // watermark used to gate out-of-order events.
+ StructType(autoCdcKeyFields :+ cdcMetadataField)
+
+ /**
+ * AutoCDC key columns resolved out of the flow's augmented schema, in
+ * `changeArgs.keys` declaration order. Keys are guaranteed to be present in
the schema
+ * because [[AutoCdcMergeFlow.schema]] validates that.
+ */
+ private lazy val autoCdcKeyFields: Seq[StructField] = {
+ val resolver = updateContext.spark.sessionState.conf.resolver
+ val targetTableSchema = flow.schema
+ flow.changeArgs.keys.map { key =>
+ targetTableSchema.fields
+ .find(field => resolver(field.name, key.name))
+ .getOrElse(
+ throw SparkException.internalError(
+ s"Key column '${key.name}' was not found in the AutoCDC flow's
selected schema."
+ )
+ )
+ }
+ }
+
+ /** CDC metadata field resolved out of the flow's augmented schema. */
+ private lazy val cdcMetadataField: StructField = {
+ val resolver = updateContext.spark.sessionState.conf.resolver
+ flow.schema.fields
+ .find(field => resolver(field.name,
Scd1BatchProcessor.cdcMetadataColName))
+ .getOrElse(
+ throw SparkException.internalError(
+ s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was
not found in the " +
+ s"AutoCDC flow's target table schema."
+ )
+ )
+ }
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
index 29e2da4a5e13..8251780524a2 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.pipelines.graph
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.pipelines.autocdc.ScdType
import org.apache.spark.sql.streaming.Trigger
/**
@@ -73,10 +75,30 @@ class FlowPlanner(
trigger = triggerFor(sf),
checkpointPath = flowMetadata.latestCheckpointLocation
)
- case _ =>
- throw new UnsupportedOperationException(
- s"Unsupported destination type: ${output.getClass.getSimpleName}
for " +
- s"streaming flow ${sf.identifier}
(${flow.destinationIdentifier})"
+ case _ => unsupportedDestinationType(sf, output)
+ }
+ case acmf: AutoCdcMergeFlow =>
+ acmf.changeArgs.storedAsScdType match {
+ case ScdType.Type1 =>
+ val flowMetadata = FlowSystemMetadata(updateContext, acmf, graph)
+ output match {
+ case o: Table =>
+ new Scd1MergeStreamingWrite(
+ identifier = acmf.identifier,
+ flow = acmf,
+ graph = graph,
+ updateContext = updateContext,
+ checkpointPath = flowMetadata.latestCheckpointLocation,
+ trigger = triggerFor(acmf),
+ destination = o,
+ sqlConf = acmf.sqlConf
+ )
+ case _ => unsupportedDestinationType(acmf, output)
+ }
+ case ScdType.Type2 =>
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+ messageParameters = Map.empty
)
}
case _ =>
@@ -85,4 +107,11 @@ class FlowPlanner(
)
}
}
+
+ private def unsupportedDestinationType(flow: ResolvedFlow, output: Output):
Nothing = {
+ throw new UnsupportedOperationException(
+ s"Unsupported destination type: ${output.getClass.getSimpleName} for " +
+ s"flow ${flow.identifier} writing to ${flow.destinationIdentifier}"
+ )
+ }
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index 8d365906559b..65eafd6c7dcc 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -409,7 +409,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
"Contract: a source df column with the reserved AutoCDC prefix is rejected
at flow " +
"construction"
) {
- val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+ val conflictingName = s"${AutoCdcReservedNames.prefix}foo"
val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
checkError(
@@ -422,7 +422,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
"caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
"columnName" -> conflictingName,
"schemaName" -> "changeDataFeed",
- "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
)
)
}
@@ -435,7 +435,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
// from any ChangeArgs path still fails at construction with a different
error. The
// reservation is on the name itself, not on its presence in the source
feed.
val cleanSourceDf = threeColumnSourceDf()
- val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+ val reservedName = s"${AutoCdcReservedNames.prefix}foo"
val keysEx = intercept[AnalysisException] {
newAutoCdcMergeFlow(
@@ -487,7 +487,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
"caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
"columnName" -> Scd1BatchProcessor.cdcMetadataColName,
"schemaName" -> "changeDataFeed",
- "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
)
)
}
@@ -497,7 +497,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val conflictingName =
-
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+ s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT)
val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
checkError(
@@ -510,7 +510,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
"caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
"columnName" -> conflictingName,
"schemaName" -> "changeDataFeed",
- "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
)
)
}
@@ -524,7 +524,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
// `spark.sql.caseSensitive`, consistent with the schema-augmentation
logic in this class.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val nonConflictingName =
-
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+ s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT)
val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType)
// No exception expected: construction succeeds.
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
new file mode 100644
index 000000000000..5e2286a4fd56
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.{Column, Row}
+import
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.autocdc.{
+ ChangeArgs,
+ ColumnSelection,
+ Scd1BatchProcessor,
+ ScdType,
+ UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.common.RunState
+import org.apache.spark.sql.pipelines.logging.RunProgress
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Shared helpers for AutoCDC end-to-end graph-execution test suites.
+ */
+trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach {
+ self: Suite with ExecutionTest with SharedSparkSession =>
+
+ /** v2 catalog name registered for AutoCDC E2E tests. Tests qualify tables
as `cat.ns1.t`. */
+ protected val catalog: String = "cat"
+
+ /** Namespace under [[catalog]] used by AutoCDC E2E tests. */
+ protected val namespace: String = "ns1"
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ spark.conf.set(
+ s"spark.sql.catalog.$catalog",
+ classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName
+ )
+ // Disable per-flow retries so failure-path tests (e.g. INCOMPATIBLE_DATA)
surface the
+ // AnalysisException after the first attempt instead of going through the
default 2 retries,
+ // which would otherwise emit duplicate FAILED events and inflate test
runtime without
+ // changing the asserted outcome.
+ spark.conf.set(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key, "0")
+ spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace")
+ }
+
+ override protected def afterEach(): Unit = {
+ SharedTablesInMemoryRowLevelOperationTableCatalog.reset()
+ spark.sessionState.catalogManager.reset()
+ spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalog")
+
spark.sessionState.conf.unsetConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key)
+ super.afterEach()
+ }
+
+ /**
+ * Run a pipeline to completion. If any flow emitted a [[RunProgress]] event
with state
+ * [[RunState.FAILED]], collect every error from the event buffer and throw
a single
+ * exception listing them, so that test failures surface meaningful stack
traces instead of
+ * generic "test exited normally but flow failed" errors.
+ */
+ protected def runPipeline(ctx: TestGraphRegistrationContext): Unit = {
+ val updateCtx = TestPipelineUpdateContext(spark, ctx.toDataflowGraph,
storageRoot)
+ updateCtx.pipelineExecution.runPipeline()
+ updateCtx.pipelineExecution.awaitCompletion()
+
+ if (updateCtx.eventBuffer.getEvents.exists(_.details ==
RunProgress(RunState.FAILED))) {
+ val errors = updateCtx.eventBuffer.getEvents.flatMap(_.error)
+ val ex = new RuntimeException(
+ s"Pipeline run failed with ${errors.size} error(s):\n" +
+ errors.map { e =>
+ val stackSnippet = e.getStackTrace
+ .map(f => s" at $f")
+ .mkString("\n")
+ s" ${e.getClass.getSimpleName}: ${e.getMessage}\n$stackSnippet"
+ }.mkString("\n")
+ )
+ errors.foreach(ex.addSuppressed)
+ throw ex
+ }
+ }
+
+ /**
+ * Walk every [[Throwable]] reachable from `failure` via
[[Throwable#getSuppressed]] and
+ * [[Throwable#getCause]], searching for the first [[SparkThrowable]] whose
+ * [[SparkThrowable#getCondition]] equals `condition`, then run
[[checkError]] against that
+ * exception with all of its other arguments propagated through.
+ */
+ protected def checkErrorInPipelineFailure(
+ failure: Throwable,
+ condition: String,
+ sqlState: Option[String] = None,
+ parameters: Map[String, String] = Map.empty,
+ matchPVals: Boolean = false,
+ queryContext: Array[ExpectedContext] = Array.empty): Unit = {
+
+ def causeChain(t: Throwable): Iterator[Throwable] =
+ Iterator.iterate[Throwable](t)(_.getCause).takeWhile(_ != null)
+
+ def reachable: Iterator[Throwable] =
+ (Iterator(failure) ++ failure.getSuppressed.iterator).flatMap(causeChain)
+
+ val matched = reachable.collectFirst {
+ case t: SparkThrowable if t.getCondition == condition => t
+ }
+ assert(
+ matched.isDefined,
+ s"Expected a SparkThrowable with condition '$condition' reachable from
the runPipeline " +
+ s"failure chain, got top-level: ${failure.getMessage}; chain:\n" +
+ reachable
+ .map(t => s" ${t.getClass.getSimpleName}: ${t.getMessage}")
+ .mkString("\n")
+ )
+ checkError(
+ exception = matched.get,
+ condition = condition,
+ sqlState = sqlState,
+ parameters = parameters,
+ matchPVals = matchPVals,
+ queryContext = queryContext
+ )
+ }
+
+ /**
+ * DDL fragment for the AutoCDC metadata column appended to every SCD1
target table. Use
+ * inside a `CREATE TABLE` statement, for example:
+ * `CREATE TABLE t (id INT NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)`
+ *
+ * Assumes sequence type is BIGINT (Long).
+ */
+ protected val cdcMetadataDdl: String = {
+ val col = Scd1BatchProcessor.cdcMetadataColName
+ val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
+ val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
+ s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL"
+ }
+
+ /**
+ * Insert a pre-existing row into a target table, populating the CDC
metadata struct so the
+ * row looks as if a previous AutoCDC run upserted it at sequencing version
[[sequence]].
+ *
+ * @param table Fully-qualified table name (catalog.schema.table).
+ * @param colValues Comma-separated SQL literals for the user-defined
columns, in declared
+ * order, excluding the trailing CDC metadata column.
+ * @param sequence Value to seed `_cdc_metadata.upsertSequence` with. The
+ * `deleteSequence` field is left NULL.
+ */
+ protected def insertPreloadedRow(table: String, colValues: String, sequence:
Long): Unit = {
+ val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
+ val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
+ spark.sql(
+ s"INSERT INTO $table SELECT $colValues, " +
+ s"named_struct('$del', CAST(NULL AS BIGINT), '$ups', CAST($sequence AS
BIGINT))"
+ )
+ }
+
+ /** Catalog identifier of the AutoCDC auxiliary table for
[[targetTableName]]. */
+ protected def auxTableNameFor(targetTableName: String): String = {
+ val targetIdent = fullyQualifiedIdentifier(targetTableName, Some(catalog),
Some(namespace))
+ AutoCdcAuxiliaryTable.identifier(targetIdent).unquotedString
+ }
+
+ /**
+ * Construct an [[AutoCdcFlow]] targeting `catalog.namespace.${target}` from
the given
+ * query and CDC knobs.
+ */
+ protected def autoCdcFlow(
+ name: String,
+ target: String,
+ query: FlowFunction,
+ keys: Seq[String],
+ sequencing: Column,
+ columnSelection: Option[ColumnSelection] = None,
+ deleteCondition: Option[Column] = None,
+ scdType: ScdType = ScdType.Type1
+ ): AutoCdcFlow = AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier(name, Some(catalog),
Some(namespace)),
+ destinationIdentifier = fullyQualifiedIdentifier(target, Some(catalog),
Some(namespace)),
+ func = query,
+ queryContext = QueryContext(
+ currentCatalog = Some(catalog),
+ currentDatabase = Some(namespace)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = keys.map(UnqualifiedColumnName(_)),
+ sequencing = sequencing,
+ columnSelection = columnSelection,
+ deleteCondition = deleteCondition,
+ storedAsScdType = scdType
+ )
+ )
+
+ /** Build a target row's `_cdc_metadata` struct value. */
+ protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row
=
+ Row(deleteSeq.orNull, upsertSeq.orNull)
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
new file mode 100644
index 000000000000..50ff60556a73
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+ ColumnSelection,
+ Scd1BatchProcessor,
+ UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering the durability of AutoCDC's auxiliary table across pipeline
runs:
+ * the per-key sequence watermarks recorded in the auxiliary table must
persist between
+ * incremental runs, and the auxiliary table must be transparently recreated
if it is
+ * deleted out-of-band.
+ */
+class AutoCdcScd1AuxiliaryTableDurabilitySuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("a higher-sequence event in a later pipeline run correctly upserts the
row") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Single MemoryStream reused across both pipeline runs so the streaming
checkpoint can
+ // resume cleanly.
+ val changeDataFeedStream = MemoryStream[(Int, String, Long)]
+ def buildGraphRegistrationContext(): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(
+ changeDataFeedStream.toDF().toDF("id", "name", "version")
+ ),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ // Run #1: insert id=1 at seq=1.
+ changeDataFeedStream.addData((1, "alice", 1L))
+ runPipeline(buildGraphRegistrationContext())
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Run #2: upsert id=1 at seq=2 (must replace) and insert id=2 at seq=1
(new key).
+ // The auxiliary table from run #1 persists and continues to gate seq
comparisons.
+ changeDataFeedStream.addData((1, "alice2", 2L), (2, "bob", 1L))
+ runPipeline(buildGraphRegistrationContext())
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice2", 2L, cdcMeta(None, Some(2L))),
+ Row(2, "bob", 1L, cdcMeta(None, Some(1L)))
+ )
+ )
+ }
+
+ test("an event with a sequence lower than what was applied in a prior
pipeline run " +
+ "is suppressed") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
+ val stream = MemoryStream[(Int, String, Long, Boolean)]
+ def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version",
"is_delete")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ deleteCondition = Some(functions.col("is_delete") === true),
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("is_delete"))
+ ))
+ ))
+ }
+
+ // Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the
watermark.
+ stream.addData((1, "alice", 10L, true))
+ runPipeline(buildCtx())
+ checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty)
+
+ // Run #2: late upsert at seq=5 (< the persisted seq=10 watermark). Must
be rejected.
+ stream.addData((1, "stale", 5L, false))
+ runPipeline(buildCtx())
+
+ // Auxiliary table watermark from run #1 (seq=10) should keep rejecting
the seq=5 event.
+ checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty)
+ }
+
+ test("the auxiliary table places the AutoCDC key column first, ahead of any
non-key " +
+ "source columns") {
+ val session = spark
+ import session.implicits._
+
+ // Source DF column order is (name, id, version): the AutoCDC key column
`id` does NOT
+ // appear first in the source DF. The auxiliary table must still write
`id` as its
+ // leading column.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(name STRING, id INT NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(String, Int, Long)]
+ stream.addData(("alice", 1, 1L))
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ val auxSchema = spark.table(auxTableNameFor("target")).schema
+
+ // The auxiliary table only contains keys and the metadata column, hence
"name" should not be
+ // included.
+ assert(auxSchema.fieldNames.toSeq == Seq("id",
Scd1BatchProcessor.cdcMetadataColName))
+ }
+
+ test("the auxiliary table preserves the user's declared key order,
independent of the " +
+ "source DataFrame and target table column orders") {
+ val session = spark
+ import session.implicits._
+
+ // Source DF: (value, id, region, version). Target table: (value, id,
region, version,
+ // _cdc_metadata) -- same ordering as the source. The user, however,
declares
+ // `keys = Seq("region", "id")` -- the OPPOSITE order from how those
columns appear in
+ // both the source DF and the target. The auxiliary table should honor the
user's
+ // declared key order, not either physical column ordering, so subsequent
runs
+ // positionally compare keys against the same recorded layout.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(value STRING, id INT NOT NULL, region STRING NOT NULL, " +
+ s"version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(String, Int, String, Long)]
+ stream.addData(("v", 1, "us", 1L))
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("value", "id", "region",
"version")),
+ keys = Seq("region", "id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ val auxSchema = spark.table(auxTableNameFor("target")).schema
+ assert(auxSchema.fieldNames.toSeq ==
+ Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+ }
+
+ test("if the AutoCDC auxiliary table is dropped between runs, it is
transparently " +
+ "recreated") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ // Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
+ val stream = MemoryStream[(Int, Long)]
+ def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ stream.addData((1, 1L))
+ runPipeline(buildCtx())
+ assert(spark.catalog.tableExists(auxTableNameFor("target")))
+
+ // Manually drop the auxiliary table.
+ spark.sql(s"DROP TABLE ${auxTableNameFor("target")}")
+ assert(!spark.catalog.tableExists(auxTableNameFor("target")))
+
+ stream.addData((1, 2L))
+ runPipeline(buildCtx())
+
+ // The dropped auxiliary table must be transparently recreated.
+ assert(spark.catalog.tableExists(auxTableNameFor("target")))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, 2L, cdcMeta(None, Some(2L))))
+ )
+ }
+
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
new file mode 100644
index 000000000000..94ba7e20aed1
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+ ColumnSelection,
+ UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's full-refresh semantics: full refresh must wipe
both the
+ * target rows and the AutoCDC auxiliary table for the refreshed targets, and
must leave
+ * non-refreshed targets untouched in selective-refresh mode.
+ */
+class AutoCdcScd1FullRefreshSuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("full refresh wipes target rows and the auxiliary table for the
refreshed flow") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Run #1: populate target + auxiliary table.
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "alice", 5L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+ assert(
+ spark.catalog.tableExists(auxTableNameFor("target")),
+ "Auxiliary table should exist after first run"
+ )
+
+ // Run #2 (full refresh): auxiliary table should be dropped by
DatasetManager, target
+ // truncated. The new run brings only id=2 at seq=1.
+ val stream2 = MemoryStream[(Int, String, Long)]
+ stream2.addData((2, "bob", 1L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ val updateCtx = TestPipelineUpdateContext(
+ spark,
+ ctx2.toDataflowGraph,
+ storageRoot,
+ fullRefreshTables = AllTables
+ )
+ updateCtx.pipelineExecution.runPipeline()
+ updateCtx.pipelineExecution.awaitCompletion()
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(2, "bob", 1L, cdcMeta(None, Some(1L))))
+ )
+ }
+
+ test("after a full refresh, an event with a sequence below the previous
run's " +
+ "watermark now lands") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Run #1: delete at seq=10 sets a high watermark in the auxiliary table.
+ val stream1 = MemoryStream[(Int, String, Long, Boolean)]
+ stream1.addData((1, "alice", 10L, true))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version",
"is_delete")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ deleteCondition = Some(functions.col("is_delete") === true),
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("is_delete"))
+ ))
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Run #2 (full refresh): auxiliary table is dropped, watermark reset.
seq=5 should
+ // now land.
+ val stream2 = MemoryStream[(Int, String, Long, Boolean)]
+ stream2.addData((1, "fresh", 5L, false))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version",
"is_delete")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ deleteCondition = Some(functions.col("is_delete") === true),
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("is_delete"))
+ ))
+ ))
+ }
+ val updateCtx = TestPipelineUpdateContext(
+ spark,
+ ctx2.toDataflowGraph,
+ storageRoot,
+ fullRefreshTables = AllTables
+ )
+ updateCtx.pipelineExecution.runPipeline()
+ updateCtx.pipelineExecution.awaitCompletion()
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "fresh", 5L, cdcMeta(None, Some(5L))))
+ )
+ }
+
+ test("selective full refresh wipes only the requested target's auxiliary
state") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_a " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_b " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ // streamA is replaced across runs because t_a is full-refreshed in run #2
(its streaming
+ // checkpoint is reset by full-refresh, so a fresh source is fine and
matches the user-visible
+ // semantics). streamB is reused across runs because t_b is NOT
full-refreshed -- its
+ // streaming checkpoint must resume against the same MemoryStream
instance, otherwise the
+ // seq=5 assertion below could pass for the wrong reason (the source never
produced seq=5
+ // in run #2 instead of the aux watermark suppressing it).
+ val streamA1 = MemoryStream[(Int, Long)]
+ val streamB = MemoryStream[(Int, Long)]
+ streamA1.addData((1, 10L))
+ streamB.addData((1, 10L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+ registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_a",
+ target = "t_a",
+ query = dfFlowFunc(streamA1.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ registerFlow(autoCdcFlow(
+ name = "flow_b",
+ target = "t_b",
+ query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Run #2: full refresh ONLY on t_a; t_b's auxiliary state must persist.
+ val streamA2 = MemoryStream[(Int, Long)]
+ streamA2.addData((1, 5L)) // would have been suppressed pre-refresh; now
wins
+ streamB.addData((1, 5L)) // must be suppressed (auxiliary table retains
seq=10)
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+ registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_a",
+ target = "t_a",
+ query = dfFlowFunc(streamA2.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ registerFlow(autoCdcFlow(
+ name = "flow_b",
+ target = "t_b",
+ query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ val updateCtx = TestPipelineUpdateContext(
+ spark,
+ ctx2.toDataflowGraph,
+ storageRoot,
+ fullRefreshTables = SomeTables(Set(
+ fullyQualifiedIdentifier("t_a", Some(catalog), Some(namespace))
+ ))
+ )
+ updateCtx.pipelineExecution.runPipeline()
+ updateCtx.pipelineExecution.awaitCompletion()
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_a"),
+ Seq(Row(1, 5L, cdcMeta(None, Some(5L))))
+ )
+ // t_b: pre-existing seq=10 row still wins; the seq=5 event is dropped.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_b"),
+ Seq(Row(1, 10L, cdcMeta(None, Some(10L))))
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
new file mode 100644
index 000000000000..32f34923c480
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * End-to-end tests that exercise interactions between separate AutoCDC
pipelines (i.e.
+ * distinct [[DataflowGraph]] / [[TestPipelineUpdateContext]] invocations)
sharing the same
+ * v2 catalog. These complement the single-pipeline AutoCDC suites by
validating the
+ * boundary semantics between independently-deployed pipelines.
+ *
+ * Each test constructs two graphs and runs them sequentially. In real
deployments these
+ * could be two different pipeline definitions writing into the same
metastore; the tests
+ * here verify that AutoCDC's per-target catalog state (target table,
auxiliary table,
+ * schema invariants) behaves correctly across these pipeline boundaries.
+ */
+class AutoCdcScd1MultiPipelineSuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("two AutoCDC pipelines targeting separate tables maintain independent
target and " +
+ "auxiliary tables") {
+ val session = spark
+ import session.implicits._
+
+ // Two distinct target tables created up-front.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_a " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_b " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1 only knows about `t_a`. Its auxiliary table
+ // cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's
`t_b`.
+ val streamA = MemoryStream[(Int, String, Long)]
+ streamA.addData((1, "alice", 100L))
+ val ctxA = new TestGraphRegistrationContext(spark) {
+ registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_a",
+ target = "t_a",
+ query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctxA)
+
+ // Pipeline #2 only knows about `t_b`. Uses a deliberately *lower*
sequence to verify
+ // the watermark from pipeline #1's auxiliary table (seq=100) does not
leak into
+ // pipeline #2.
+ val streamB = MemoryStream[(Int, String, Long)]
+ streamB.addData((9, "bob", 1L))
+ val ctxB = new TestGraphRegistrationContext(spark) {
+ registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_b",
+ target = "t_b",
+ query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctxB)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_a"),
+ Seq(Row(1, "alice", 100L, cdcMeta(None, Some(100L))))
+ )
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_b"),
+ Seq(Row(9, "bob", 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Each target has its own auxiliary table; no cross-contamination.
+ assert(spark.catalog.tableExists(auxTableNameFor("t_a")))
+ assert(spark.catalog.tableExists(auxTableNameFor("t_b")))
+ }
+
+ test("a downstream pipeline can read an AutoCDC target written by a
different pipeline " +
+ "without observing the CDC metadata column") {
+ val session = spark
+ import session.implicits._
+
+ // Pipeline #1 writes into target `src` via AutoCDC.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.src " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData((1, "alice", 1L), (2, "bob", 1L))
+ val ctxWriter = new TestGraphRegistrationContext(spark) {
+ registerTable("src", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "writer",
+ target = "src",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctxWriter)
+
+ // Pipeline #2 is a regular materialized view that selects the user-data
columns from
+ // `src` (a different graph entirely). It must observe the merged AutoCDC
rows and be
+ // able to ignore the metadata column without it polluting downstream
consumers.
+ val ctxReader = new TestGraphRegistrationContext(spark) {
+ registerMaterializedView(
+ "downstream_mv",
+ query = dfFlowFunc(
+ spark.read.table(s"$catalog.$namespace.src").select("id", "name",
"version")
+ )
+ )
+ }
+ runPipeline(ctxReader)
+
+ checkAnswer(
+ spark.table(fullyQualifiedIdentifier("downstream_mv").toString),
+ Seq(Row(1, "alice", 1L), Row(2, "bob", 1L))
+ )
+ }
+
+ test("two AutoCDC pipelines targeting the same table with identical key and
data " +
+ "schemas merge into a shared target table") {
+ val session = spark
+ import session.implicits._
+
+ // Target table is created once up-front; both pipelines target it with
the same
+ // AutoCDC `keys` and the same source-DF data schema. The two pipelines
have distinct
+ // flow names ("flow_v1" / "flow_v2") so they own independent streaming
checkpoints,
+ // but share the target table and its auxiliary table.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.shared_target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1: inserts rows with id=1 and id=2 at version=1.
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "alice", 1L), (2, "bob", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "shared_target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Sanity-check pipeline #1's effect before pipeline #2 runs.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.shared_target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+ Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+ )
+ )
+
+ // Pipeline #2: updates id=2 (existing key) to a higher sequence and
inserts id=3
+ // (new key). id=1 is untouched and must survive into the final target
unchanged.
+ val stream2 = MemoryStream[(Int, String, Long)]
+ stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "shared_target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx2)
+
+ // Final target: id=1 untouched (pipeline #1's state), id=2 updated by
pipeline #2,
+ // id=3 freshly inserted by pipeline #2.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.shared_target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+ Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L))),
+ Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+ )
+ )
+
+ // The auxiliary table for the shared target is itself shared across both
pipelines.
+ assert(spark.catalog.tableExists(auxTableNameFor("shared_target")))
+ }
+
+ test("two AutoCDC pipelines targeting the same table with the same key but
different " +
+ "data columns evolve the shared target schema") {
+ val session = spark
+ import session.implicits._
+
+ // Target is created up-front with pipeline #1's schema only; pipeline #2
brings a new
+ // top-level nullable `age` column that the dataset materialization layer
is expected
+ // to schema-merge into the target.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.shared_target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Pipeline #1: source DF schema is (id, name, version); inserts id=1 and
id=2.
+ val stream1 = MemoryStream[(Int, String, Long)]
+ stream1.addData((1, "alice", 1L), (2, "bob", 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v1",
+ target = "shared_target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Sanity-check pipeline #1's state before schema evolution kicks in.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.shared_target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+ Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+ )
+ )
+
+ // Pipeline #2: source DF schema is (id, name, age, version). The new
nullable `age` column
+ // should be added to the target by dataset materialization; pipeline #1's
untouched id=1 row
+ // is backfilled to NULL.
+ val stream2 = MemoryStream[(Int, String, Option[Int], Long)]
+ stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_v2",
+ target = "shared_target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age",
"version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx2)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.shared_target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)),
null),
+ Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)),
25),
+ Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)),
30)
+ )
+ )
+
+ // Pipeline #1 runs again with its original (id, name, version) schema.
The evolved
+ // target schema with `age` must persist: id=1's update leaves age
untouched, id=4 is
+ // inserted with age=NULL, and pipeline #2's id=2/id=3 rows are unchanged.
+ stream1.addData((1, "alice-v2", 2L), (4, "dave", 1L))
+ runPipeline(ctx1)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.shared_target"),
+ Seq(
+ Row(1, "alice-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq =
Some(2L)), null),
+ Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)),
25),
+ Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)),
30),
+ Row(4, "dave", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)),
null)
+ )
+ )
+ }
+
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
new file mode 100644
index 000000000000..4c20b21ad57a
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.autocdc.{
+ ColumnSelection,
+ UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's interaction with schema evolution across pipeline
runs. The
+ * suite documents the supported additive cases (new top-level columns, new
nested fields
+ * in array-of-struct, broadening / narrowing column selection) and the cases
that fail
+ * loudly today (subtractive nested evolution, type-incompatible changes,
case-only
+ * renames).
+ *
+ * These behaviors are largely inherited from the lower layers
(`SchemaMergingUtils` for
+ * schema merge, the v2 writer's column-resolution layer for nested-field
handling) rather
+ * than implemented in AutoCDC itself; the tests here serve as the contract
for AutoCDC's
+ * observable behavior on top of those layers.
+ */
+class AutoCdcScd1SchemaEvolutionSuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("a nullable non-key column merges correctly with mixed NULL and
non-NULL values") {
+ val session = spark
+ import session.implicits._
+
+ // Single MemoryStream with `email` as nullable from the start. Run #1
emits a row with
+ // a NULL email; run #2 emits an upsert with a non-NULL email.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, String, Option[String], Long)]
+ def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ // Run #1: insert with NULL email.
+ stream.addData((1, "alice", None, 1L))
+ runPipeline(buildCtx())
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", null, 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Run #2: upsert with non-NULL email at higher seq replaces the row.
+ stream.addData((1, "alice2", Some("[email protected]"), 2L))
+ runPipeline(buildCtx())
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice2", "[email protected]", 2L, cdcMeta(None, Some(2L))))
+ )
+ }
+
+ test("widening a non-key column's type between runs fails with " +
+ "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+ val session = spark
+ import session.implicits._
+
+ // Changing a non-key column's type between pipeline runs is rejected by
+ // `SchemaMergingUtils` with CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE even when
the new type
+ // is strictly wider. Users must full-refresh the target to change column
types.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, age INT, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Int, Long)]
+ stream1.addData((1, 30, 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Run #2: widen `age` from Int to Long.
+ val stream2 = MemoryStream[(Int, Long, Long)]
+ stream2.addData((1, 31L, 2L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+ sqlState = Some("42825"),
+ // `left` is the persisted (run #1) INT type; `right` is run #2's
widened BIGINT.
+ parameters = Map(
+ "left" -> "\"INT\"",
+ "right" -> "\"BIGINT\""
+ )
+ )
+ }
+
+ test("narrowing a non-key column's type between runs fails with " +
+ "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+ val session = spark
+ import session.implicits._
+
+ // Mirror image of the widening test above: changing a non-key column's
type between
+ // pipeline runs is rejected by SchemaMergingUtils with
CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE
+ // even when the new type is strictly narrower.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, payload BIGINT, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long, Long)]
+ stream1.addData((1, 100L, 1L))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Run #2: narrow `payload` from Long (BIGINT) to Int (INT).
+ val stream2 = MemoryStream[(Int, Int, Long)]
+ stream2.addData((1, 5, 2L))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+ sqlState = Some("42825"),
+ // `left` is the persisted (run #1) BIGINT type; `right` is run #2's
narrowed INT.
+ parameters = Map(
+ "left" -> "\"BIGINT\"",
+ "right" -> "\"INT\""
+ )
+ )
+ }
+
+ test("a new top-level nullable column appearing in the source DF between
runs is " +
+ "added to the target") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Single MemoryStream of (id, name, email, version) shared across runs so
the streaming
+ // checkpoint can resume cleanly. Run #1's flow drops `email` so the
source's resolved DF
+ // schema is 3 columns; run #2 keeps all 4. The MemoryStream's underlying
tuple schema is
+ // unchanged (only the downstream projection differs), so the source
identity that the
+ // OffsetSeqLog records is stable across runs.
+ val stream = MemoryStream[(Int, String, Option[String], Long)]
+ def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+ val projectedDf = if (includeEmail) sourceDf else
sourceDf.drop("email")
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(projectedDf),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ // Run #1: source projects (id, name, version). Target schema is unchanged.
+ stream.addData((1, "alice", None, 1L))
+ runPipeline(buildCtx(includeEmail = false))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Run #2: source projects (id, name, email, version). mergeSchemas
appends `email` to
+ // the target (StructType.merge keeps the left schema's order and appends
right-only
+ // fields); existing rows get NULL for the new column.
+ stream.addData((2, "bob", Some("[email protected]"), 2L))
+ runPipeline(buildCtx(includeEmail = true))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null),
+ Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "[email protected]")
+ )
+ )
+ }
+
+ test("broadening the column selection between runs adds the newly-included
column to " +
+ "the target") {
+ val session = spark
+ import session.implicits._
+
+ // Source DF schema is fixed at (id, name, email, version) across both
runs. Only the
+ // `columnSelection` knob differs: run #1 includes (id, name, version);
run #2 selects
+ // None (= all source columns). mergeSchemas adds `email` to the target
via the same
+ // generic SDP path as the new-source-column case, but driven by the
+ // [[ColumnSelection]] knob rather than the source DF's own schema.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, String, String, Long)]
+ def buildCtx(selection: Option[ColumnSelection]):
TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ columnSelection = selection
+ ))
+ }
+
+ // Run #1: only (id, name, version) selected; `email` is dropped before
the MERGE.
+ stream.addData((1, "alice", "ignored", 1L))
+ runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns(
+ Seq("id", "name", "version").map(UnqualifiedColumnName(_))
+ ))))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Run #2: broaden to no selection. mergeSchemas adds `email`; existing
rows get NULL,
+ // new rows get the actual value.
+ stream.addData((2, "bob", "[email protected]", 2L))
+ runPipeline(buildCtx(selection = None))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null),
+ Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "[email protected]")
+ )
+ )
+ }
+
+ test("narrowing the column selection between runs preserves the dropped
column on " +
+ "existing rows and leaves it NULL on new rows") {
+ val session = spark
+ import session.implicits._
+
+ // Validates the additive-only column-selection contract on the narrowing
side:
+ // tightening `columnSelection` between runs leaves the dropped column in
place at the
+ // schema level (SDP's `SchemaMergingUtils.mergeSchemas` is a union, never
a subtraction).
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, String, String, Long)]
+ def buildCtx(selection: Option[ColumnSelection]):
TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ columnSelection = selection
+ ))
+ }
+
+ // Run #1: include all columns; populate `email` for key=1.
+ stream.addData((1, "alice", "[email protected]", 1L))
+ runPipeline(buildCtx(selection = None))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", "[email protected]", 1L, cdcMeta(None, Some(1L))))
+ )
+
+ // Run #2: narrow the selection to drop `email`. The merge omits `email`
from both
+ // INSERT and UPDATE assignment maps; key=1's `email` is preserved at
"[email protected]" while
+ // key=2 is inserted with `email = NULL`.
+ stream.addData((2, "bob", "ignored", 2L))
+ runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns(
+ Seq("id", "name", "version").map(UnqualifiedColumnName(_))
+ ))))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice", "[email protected]", 1L, cdcMeta(None, Some(1L))),
+ Row(2, "bob", null, 2L, cdcMeta(None, Some(2L)))
+ )
+ )
+ }
+
+ test("a top-level column dropped from the source DF between runs is
preserved on " +
+ "existing rows and left NULL on new rows") {
+ val session = spark
+ import session.implicits._
+
+ // Symmetric to the new-source-column case (which exercises the source DF
*gaining* a
+ // column). Validates that the additive-only column-selection contract
holds when the
+ // narrowing is driven by the source DF's own schema shrinking, rather
than by a
+ // tightening [[ChangeArgs.columnSelection]].
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ // Same `MemoryStream[(Int, String, Option[String], Long)]` shape across
runs; runs
+ // differ in whether `email` is kept in the projected source DF.
+ val stream = MemoryStream[(Int, String, Option[String], Long)]
+ def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+ val projectedDf = if (includeEmail) sourceDf else
sourceDf.drop("email")
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(projectedDf),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ // Run #1: wide source DF (id, name, email, version). mergeSchemas appends
`email` to
+ // the target.
+ stream.addData((1, "alice", Some("[email protected]"), 1L))
+ runPipeline(buildCtx(includeEmail = true))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "[email protected]"))
+ )
+
+ // Run #2: source DF drops `email` upstream of the flow. Target still has
`email`
+ // (`StructType.merge` is additive-only); the merge omits `email` from
both INSERT and
+ // UPDATE assignment maps. Key=1's `email` is preserved at "[email protected]";
key=2 is inserted
+ // with `email = NULL`.
+ stream.addData((2, "bob", None, 2L))
+ runPipeline(buildCtx(includeEmail = false))
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "[email protected]"),
+ Row(2, "bob", 2L, cdcMeta(None, Some(2L)), null)
+ )
+ )
+ }
+
+ test("dropping a nested struct field between runs fails with
INCOMPATIBLE_DATA_FOR_TABLE") {
+ val session = spark
+ import session.implicits._
+
+ // The v2 writer's column-resolution layer requires every nested target
field to be
+ // present in the microbatch DF. When run #2's source projection drops
`b.c`, the merge
+ // fails with INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA. Users who want
to drop a
+ // nested field between runs must full-refresh the target.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+ s"value STRUCT<a:INT,b:STRUCT<c:INT,d:INT>>, $cdcMetadataDdl)"
+ )
+
+ // Stream is (key, version, a, b_c, b_d). Each run reshapes into different
`value`
+ // shapes; the underlying tuple shape is unchanged so the streaming
source's identity
+ // is stable across runs.
+ val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+ def buildCtx(includeC: Boolean): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeC) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_d").as("d"))
+ }
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.struct(functions.col("a"), inner.as("b")).as("value")
+ )
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(projected),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
+ runPipeline(buildCtx(includeC = true))
+
+ // Run #2 drops b.c. The v2 writer rejects the merge because it cannot
find data for
+ // the target's `value.b.c` column.
+ stream.addData((1, 2L, 10, 99, 10), (3, 1L, 3, 99, 3))
+ val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeC =
false)) }
+ // The V2 writer's `TableOutputResolver` produces this error during plan
analysis with
+ // an empty `tableName` because the merge plan it analyzes does not carry
the target's
+ // catalog identifier through to the resolver call site.
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> "``",
+ "colName" -> "`value`.`b`.`c`"
+ )
+ )
+ }
+
+ test("a new field added inside an array<struct> element between runs is
added to the " +
+ "target") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+ s"vals ARRAY<STRUCT<a:INT,b:STRUCT<c:INT>>>, $cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+ def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeD) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_c").as("c"))
+ }
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.array(
+ functions.struct(functions.col("a"), inner.as("b"))
+ ).as("vals")
+ )
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(projected),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ stream.addData((1, 1L, 1, 1, 99))
+ runPipeline(buildCtx(includeD = false))
+
+ // Run #2 widens to include b.d. Existing key=1 row's vals[0].b.d is NULL
until the
+ // upsert at version=2 writes the new value.
+ stream.addData((1, 2L, 1, 1, 2), (3, 1L, 3, 3, 3))
+ runPipeline(buildCtx(includeD = true))
+
+ // Inline-explode flattens the array<struct> for assertion.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target")
+ .selectExpr("key", "inline(vals) as (a, b)")
+ .select("key", "a", "b.c", "b.d"),
+ Seq(
+ Row(1, 1, 1, 2),
+ Row(3, 3, 3, 3)
+ )
+ )
+ }
+
+ test("dropping a field inside an array<struct> element between runs fails
with " +
+ "INCOMPATIBLE_DATA_FOR_TABLE") {
+ val session = spark
+ import session.implicits._
+
+ // Symmetric to the nested-struct case, but for `array<struct>`. The v2
writer rejects
+ // the merge because it cannot find data for the target's
`vals.element.b.d` column
+ // when run #2's projection drops `d` from the element struct. Users must
full-refresh
+ // the target to drop a nested array-element field.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+ s"vals ARRAY<STRUCT<a:INT,b:STRUCT<c:INT,d:INT>>>, $cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+ def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeD) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_c").as("c"))
+ }
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.array(
+ functions.struct(functions.col("a"), inner.as("b"))
+ ).as("vals")
+ )
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(projected),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
+ runPipeline(buildCtx(includeD = true))
+
+ stream.addData((1, 2L, 10, 10, 99), (3, 1L, 3, 3, 99))
+ val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeD =
false)) }
+ // See the nested-struct test above for why `tableName` is empty here.
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> "``",
+ "colName" -> "`vals`.`element`.`b`.`d`"
+ )
+ )
+ }
+
+ test("a source DF column whose name differs from the target only by case
fails with " +
+ "AMBIGUOUS_REFERENCE under case-insensitive resolution") {
+ val session = spark
+ import session.implicits._
+
+ // `DatasetManager`'s schema-merge compares the existing target schema and
the flow's
+ // output schema *case-sensitively*: `SchemaMergingUtils.mergeSchemas`
calls
+ // `StructType.merge` without forwarding the session-level
case-sensitivity. When the
+ // target has `value` and the source DF emits `Value`, the merged schema
ends up with
+ // both as separate columns. Reference resolution downstream is
case-insensitive
+ // (Spark's default), so the MERGE plan trips on the duplicate and reports
+ // AMBIGUOUS_REFERENCE.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(key INT NOT NULL, version BIGINT NOT NULL, value STRING,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, Long, String)]
+ stream.addData((1, 1L, "alice"))
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ // Source DF emits `Value` (capital), differing only in case from the
target's
+ // `value` column.
+ val df = stream.toDF().toDF("key", "version", "Value")
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(df),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ // The exact `name` and `referenceNames` parameters depend on internal
merge-plan
+ // synthesis; the condition match is the meaningful invariant for this
test.
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AMBIGUOUS_REFERENCE",
+ parameters = Map(
+ "name" -> ".*",
+ "referenceNames" -> ".*"
+ ),
+ matchPVals = true,
+ queryContext = Array(
+ ExpectedContext(
+ fragment = s"`$catalog`.`$namespace`.`target`.`Value`",
+ start = 0,
+ stop = 27
+ )
+ )
+ )
+ }
+ }
+
+ test("extra columns on the target that the AutoCDC flow does not emit are
preserved " +
+ "across the merge") {
+ val session = spark
+ import session.implicits._
+
+ // The target is wider than the AutoCDC flow's source DF: column `extra`
is present on
+ // the target but never produced by the flow. AutoCDC must tolerate the
extra target
+ // column -- pre-existing rows keep their `extra` value, and
newly-inserted rows
+ // resolve `extra` to NULL.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, extra INT,
$cdcMetadataDdl)"
+ )
+ insertPreloadedRow(
+ s"$catalog.$namespace.target",
+ colValues = "1, 'preloaded', 0, 42",
+ sequence = 0L
+ )
+
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData((1, "alice", 1L), (2, "bob", 1L))
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target").select("id", "name",
"version", "extra"),
+ Seq(
+ Row(1, "alice", 1L, 42), // extra preserved on the upsert
+ Row(2, "bob", 1L, null) // extra is NULL for inserts
+ )
+ )
+ }
+
+ test("changing a non-key column type from TIMESTAMP to STRING between runs
fails with " +
+ "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+ val session = spark
+ import session.implicits._
+
+ // `mergeSchemas` rejects an incompatible type change between TIMESTAMP
and STRING.
+ // Captured alongside the type-widening / type-narrowing tests; users must
full-refresh
+ // the target to change a column's type.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(key INT NOT NULL, version BIGINT NOT NULL, value TIMESTAMP,
$cdcMetadataDdl)"
+ )
+
+ val stream1 = MemoryStream[(Int, Long, Timestamp)]
+ stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00")))
+ val ctx1 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx1)
+
+ // Run #2 emits `value` as STRING. mergeSchemas rejects the type change.
+ val stream2 = MemoryStream[(Int, Long, String)]
+ stream2.addData((1, 2L, "2024-01-02 11:00:00"))
+ val ctx2 = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")),
+ keys = Seq("key"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+ sqlState = Some("42825"),
+ // `left` is the persisted (run #1) TIMESTAMP type; `right` is run #2's
STRING.
+ parameters = Map(
+ "left" -> "\"TIMESTAMP\"",
+ "right" -> "\"STRING\""
+ )
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
new file mode 100644
index 000000000000..f06b8c461533
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+ ChangeArgs,
+ ColumnSelection,
+ ScdType,
+ UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Smoke tests for AutoCDC SCD type 1 flows running within a single pipeline:
one
+ * [[DataflowGraph]] / [[TestPipelineUpdateContext]] executes one or more
AutoCDC flows,
+ * and the target table contents are asserted at the end. Multi-pipeline
scenarios (where
+ * multiple pipelines write to the same target) live in
[[AutoCdcScd1MultiPipelineSuite]].
+ */
+class AutoCdcScd1SinglePipelineSuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("an upsert event lands a new row in an empty target table") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData((1, "alice", 1L))
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+
+ runPipeline(ctx)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+ )
+ }
+
+ test("consecutive upsert, delete, and re-upsert events for the same key in
one run " +
+ "converge to the latest event") {
+ val session = spark
+ import session.implicits._
+
+ // Target schema deliberately omits `is_delete`: the source carries it as
a control
+ // column, drives the deleteCondition, and is excluded from the target
projection.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, String, Long, Boolean)]
+ stream.addData(
+ (1, "alice", 1L, false), // initial upsert
+ (1, "alice", 2L, true), // delete
+ (1, "alice2", 3L, false) // reinsert
+ )
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version",
"is_delete")),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ deleteCondition = Some(functions.col("is_delete") === true),
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("is_delete"))
+ ))
+ ))
+ }
+
+ runPipeline(ctx)
+
+ // After all three events at seqs 1, 2, 3: row "alice2" wins as the
highest-sequenced
+ // upsert; the delete at seq=2 is superseded by the seq=3 upsert.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice2", 3L, cdcMeta(None, Some(3L))))
+ )
+ }
+
+ test("two AutoCDC flows targeting separate tables in one pipeline produce
independent " +
+ "results") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_a " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.t_b " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val streamA = MemoryStream[(Int, Long)]
+ val streamB = MemoryStream[(Int, Long)]
+ streamA.addData((1, 1L), (2, 1L))
+ streamB.addData((10, 1L))
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+ registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "flow_a",
+ target = "t_a",
+ query = dfFlowFunc(streamA.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ registerFlow(autoCdcFlow(
+ name = "flow_b",
+ target = "t_b",
+ query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_a"),
+ Seq(Row(1, 1L, cdcMeta(None, Some(1L))), Row(2, 1L, cdcMeta(None,
Some(1L))))
+ )
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.t_b"),
+ Seq(Row(10, 1L, cdcMeta(None, Some(1L))))
+ )
+ assert(spark.catalog.tableExists(auxTableNameFor("t_a")))
+ assert(spark.catalog.tableExists(auxTableNameFor("t_b")))
+ }
+
+ test("an AutoCDC flow targeting a table whose format does not support
row-level " +
+ "operations fails with AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE") {
+ val session = spark
+ import session.implicits._
+
+ // Intentionally use a non-merge-compatible catalog, whose default table
format is parquet.
+ val catalog = TestGraphRegistrationContext.DEFAULT_CATALOG
+ val database = TestGraphRegistrationContext.DEFAULT_DATABASE
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$database.target_no_merge " +
+ s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+ )
+
+ val stream = MemoryStream[(Int, Long)]
+ stream.addData((1, 1L))
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target_no_merge")
+ registerFlow(AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+ destinationIdentifier = fullyQualifiedIdentifier("target_no_merge"),
+ func = dfFlowFunc(stream.toDF().toDF("id", "version")),
+ queryContext = QueryContext(
+ currentCatalog = Some(catalog),
+ currentDatabase = Some(database)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = functions.col("version"),
+ storedAsScdType = ScdType.Type1
+ )
+ ))
+ }
+
+ val ex = intercept[RuntimeException] { runPipeline(ctx) }
+ checkErrorInPipelineFailure(
+ failure = ex,
+ condition = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE",
+ sqlState = Some("0A000"),
+ parameters = Map(
+ "tableName" -> s"`$catalog`.`$database`.`target_no_merge`",
+ "format" -> "parquet"
+ )
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
new file mode 100644
index 000000000000..46f8ee47db02
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's behavior when the target table is pre-populated by
something
+ * other than a prior AutoCDC run: pre-loaded rows, missing CDC metadata
column on the
+ * target, and rows with NULL CDC metadata. These cases verify that AutoCDC
interoperates
+ * gracefully with users who hand-populate the target table.
+ */
+class AutoCdcScd1TargetTableDurabilitySuite
+ extends ExecutionTest
+ with SharedSparkSession
+ with AutoCdcGraphExecutionTestMixin {
+
+ test("pre-loaded rows: an event with a lower sequence is suppressed and a
higher one " +
+ "wins") {
+ val session = spark
+ import session.implicits._
+
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+ insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 5", 5L)
+ insertPreloadedRow(s"$catalog.$namespace.target", "2, 'bob', 5", 5L)
+
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData(
+ (1, "stale", 2L), // < pre-existing seq=5 -> ignored
+ (2, "bob2", 10L) // > pre-existing seq=5 -> upserts
+ )
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(
+ Row(1, "alice", 5L, cdcMeta(None, Some(5L))),
+ Row(2, "bob2", 10L, cdcMeta(None, Some(10L)))
+ )
+ )
+ }
+
+ test("pre-loaded target rows merge correctly on the first AutoCDC run, and
the " +
+ "auxiliary table is created lazily") {
+ val session = spark
+ import session.implicits._
+
+ // Target was populated by some external process; this is the first
AutoCDC run.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL,
$cdcMetadataDdl)"
+ )
+ insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 1", 1L)
+
+ assert(
+ !spark.catalog.tableExists(auxTableNameFor("target")),
+ "Auxiliary table should not exist before the first AutoCDC run"
+ )
+
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData((1, "bob", 2L))
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ // seq=2 > pre-existing seq=1, so "bob" replaces "alice" via the upsert
sequence column.
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "bob", 2L, cdcMeta(None, Some(2L))))
+ )
+ assert(
+ spark.catalog.tableExists(auxTableNameFor("target")),
+ "Auxiliary table should be created lazily on the first AutoCDC run"
+ )
+ }
+
+ test("a target table created without the CDC metadata column gets the column
" +
+ "auto-added on the first AutoCDC run") {
+ val session = spark
+ import session.implicits._
+
+ // User creates the target without the AutoCDC metadata column.
DatasetManager evolves
+ // the existing table schema by merging it with the AutoCdcMergeFlow's
output schema,
+ // which includes the metadata column. The first run therefore proceeds
normally, and
+ // subsequent reads see the metadata struct alongside the user's data
columns.
+ spark.sql(
+ s"CREATE TABLE $catalog.$namespace.target " +
+ s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL)"
+ )
+
+ val stream = MemoryStream[(Int, String, Long)]
+ stream.addData((1, "alice", 1L))
+
+ val ctx = new TestGraphRegistrationContext(spark) {
+ registerTable("target", catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = "auto_cdc_flow",
+ target = "target",
+ query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+ keys = Seq("id"),
+ sequencing = functions.col("version")
+ ))
+ }
+ runPipeline(ctx)
+
+ val schema = spark.table(s"$catalog.$namespace.target").schema
+ assert(
+ schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName),
+ s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first
AutoCDC run; " +
+ s"got ${schema.fieldNames.toSeq}"
+ )
+ checkAnswer(
+ spark.table(s"$catalog.$namespace.target"),
+ Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index f19fed4e5780..8dad5019c0fe 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -660,7 +660,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with
SharedSparkSession {
) {
// Temporary views in SDP normally accept either streaming or batch
producing flows, but
// AutoCDC flows are an explicit exception: SCD reconciliation only runs
at the
- // streaming-table sink (`Scd1ForeachBatchExec`), so pointing an AutoCDC
flow at a view
+ // streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an
AutoCDC flow at a view
// would silently drop reconciliation and expose just the projected CDF to
consumers.
// `validateFlowStreamingness` rejects this case with a dedicated
sub-condition under
// INVALID_FLOW_QUERY_TYPE.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]