This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 9da419233dfa [SPARK-57080][SDP] Register AutoCDC Flows from
`PipelinesHandler`
9da419233dfa is described below
commit 9da419233dfa9f104e3276cb0e6a4232b66aab4c
Author: AnishMahto <[email protected]>
AuthorDate: Wed May 27 11:03:55 2026 -0700
[SPARK-57080][SDP] Register AutoCDC Flows from `PipelinesHandler`
### What changes were proposed in this pull request?
In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow`
proto is received with `AUTO_CDC_FLOW_DETAILS`.
This is the final step in connecting a spark connect client to the spark
connect server's SDP engine for AutoCDC flows. With these changes, a user
should be able to run their SDP with AutoCDC flows using the pipelines CLI
runner.
### Why are the changes needed?
Allows spark connect clients to actually register and execute AutoCDC flows
within their pipelines.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Test graph registration and construction through Python client in
`PythonPipelineSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh
Closes #56124 from
AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler.
Lead-authored-by: AnishMahto <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit b848baaee6c7b66ed60cd0747539ac8fa8cb56ed)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 26 +-
.../sql/connect/pipelines/PipelinesHandler.scala | 143 +++++++-
.../sql/connect/planner/SparkConnectPlanner.scala | 3 +-
.../connect/pipelines/PythonPipelineSuite.scala | 400 ++++++++++++++++++++-
.../spark/sql/pipelines/autocdc/ChangeArgs.scala | 8 +-
.../sql/pipelines/autocdc/ChangeArgsSuite.scala | 6 +-
6 files changed, 554 insertions(+), 32 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index defa7424d268..d66692a3b22e 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -191,6 +191,12 @@
],
"sqlState" : "0A000"
},
+ "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST" : {
+ "message" : [
+ "AutoCDC flow specifies both `column_list` and `except_column_list`; at
most one may be provided."
+ ],
+ "sqlState" : "42613"
+ },
"AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the following columns
are not present in the <schemaName> schema: <missingColumns>. Available
columns: <availableColumns>."
@@ -232,11 +238,23 @@
},
"sqlState" : "22000"
},
+ "AUTOCDC_MISSING_SEQUENCE_BY" : {
+ "message" : [
+ "AutoCDC flow is missing a required `sequence_by` expression. Specify a
`sequence_by` column or expression that orders incoming change events."
+ ],
+ "sqlState" : "22023"
+ },
+ "AUTOCDC_MISSING_SOURCE" : {
+ "message" : [
+ "AutoCDC flow is missing a required `source` table name. Specify the
name of the streaming source table the flow should read from."
+ ],
+ "sqlState" : "22023"
+ },
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier
<columnName> (parts: <nameParts>)."
],
- "sqlState" : "42703"
+ "sqlState" : "22023"
},
"AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
"message" : [
@@ -244,6 +262,12 @@
],
"sqlState" : "42000"
},
+ "AUTOCDC_NON_COLUMN_IDENTIFIER" : {
+ "message" : [
+ "Expected a column identifier; got the non-attribute expression
`<expression>`. AutoCDC keys, sequence_by, column_list, and except_column_list
must reference unqualified column names."
+ ],
+ "sqlState" : "22023"
+ },
"AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
"The column `<columnName>` in the <schemaName> schema collides with the
reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using
<caseSensitivity> column name comparison). Rename or remove the column."
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index 04dbc1a45506..f8edbc992800 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.connect.pipelines
-import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.util.Using
@@ -25,16 +24,21 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanResponse,
PipelineCommandResult, Relation, ResolvedIdentifier}
+import
org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Column}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute,
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace,
CreateTable, CreateTableAsSelect, CreateView, DescribeRelation,
DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan,
RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties,
ShowTables, ShowViews}
+import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand,
ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ColumnSelection,
ScdType, UnqualifiedColumnName}
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
-import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis,
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables,
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink,
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter,
TemporaryView, UntypedFlow}
+import org.apache.spark.sql.pipelines.graph.{AllTables, AutoCdcFlow,
FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext,
IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext,
QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables,
SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType
@@ -52,6 +56,8 @@ private[connect] object PipelinesHandler extends Logging {
* @param transformRelationFunc
* Function used to convert a relation to a LogicalPlan. This is used when
determining the
* LogicalPlan that a flow returns.
+ * @param transformExpressionFunc
+ * Function used to convert a proto expression to a Catalyst expression.
* @return
* The response after handling the command
*/
@@ -59,7 +65,8 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder,
cmd: proto.PipelineCommand,
responseObserver: StreamObserver[ExecutePlanResponse],
- transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult =
{
+ transformRelationFunc: Relation => LogicalPlan,
+ transformExpressionFunc: proto.Expression => Expression):
PipelineCommandResult = {
// Currently most commands do not include any information in the response.
We just send back
// an empty response to the client to indicate that the command was
handled successfully
val defaultResponse = PipelineCommandResult.getDefaultInstance
@@ -99,7 +106,11 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.CommandTypeCase.DEFINE_FLOW =>
logInfo(s"Define pipelines flow cmd received: $cmd")
val resolvedFlow =
- defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder)
+ defineFlow(
+ cmd.getDefineFlow,
+ transformRelationFunc,
+ transformExpressionFunc,
+ sessionHolder)
val identifierBuilder = ResolvedIdentifier.newBuilder()
resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName)
resolvedFlow.database.foreach { ns =>
@@ -315,6 +326,7 @@ private[connect] object PipelinesHandler extends Logging {
private def defineFlow(
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
+ transformExpressionFunc: proto.Expression => Expression,
sessionHolder: SessionHolder): TableIdentifier = {
if (flow.hasOnce) {
throw new AnalysisException(
@@ -379,22 +391,125 @@ private[connect] object PipelinesHandler extends Logging
{
sqlConf = flow.getSqlConfMap.asScala.toMap,
once = false,
queryContext = QueryContext(Option(defaultCatalog),
Option(defaultDatabase)),
- origin = QueryOrigin(
- filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
- flow.getSourceCodeLocation.getFileName),
- line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
- flow.getSourceCodeLocation.getLineNumber),
- objectType = Some(QueryOriginType.Flow.toString),
- objectName = Option(flowIdentifier.unquotedString),
- language = Some(Python()))))
+ origin = flowOrigin(flow, flowIdentifier)))
case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS
=>
- throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet
implemented.")
+ graphElementRegistry.registerFlow(
+ buildAutoCdcFlow(
+ autoCdcDetails = flow.getAutoCdcFlowDetails,
+ flow = flow,
+ flowIdentifier = flowIdentifier,
+ destinationIdentifier = destinationIdentifier,
+ defaultCatalog = defaultCatalog,
+ defaultDatabase = defaultDatabase,
+ sessionHolder = sessionHolder,
+ transformExpressionFunc = transformExpressionFunc))
case other =>
throw new UnsupportedOperationException(s"Unsupported DefineFlow
details case: $other")
}
flowIdentifier
}
+ /**
+ * Build an [[AutoCdcFlow]] from the proto-supplied AutoCDC flow details.
+ *
+ * The flow's source expression is encoded by the Python client as a
streaming-table name; we
+ * model that on the server side as a streaming [[UnresolvedRelation]] so
that pipelines flow
+ * analysis (which already handles `STREAM(t)` references) can resolve it
against the rest of
+ * the dataflow graph.
+ */
+ private def buildAutoCdcFlow(
+ autoCdcDetails: AutoCdcFlowDetails,
+ flow: proto.PipelineCommand.DefineFlow,
+ flowIdentifier: TableIdentifier,
+ destinationIdentifier: TableIdentifier,
+ defaultCatalog: String,
+ defaultDatabase: String,
+ sessionHolder: SessionHolder,
+ transformExpressionFunc: proto.Expression => Expression): AutoCdcFlow = {
+ // TODO(SPARK-57092): apply_as_truncates is declared on AutoCdcFlowDetails
but is not yet
+ // honored by the engine; wire it through once SCD1 truncate support
lands.
+ // TODO(SPARK-57093): ignore_null_updates_column_list and
ignore_null_updates_except_column_list
+ // are declared on AutoCdcFlowDetails but are not yet honored by the
engine; wire them
+ // through once SCD1 ignore-null support lands.
+
+ if (!autoCdcDetails.hasSource) {
+ throw new AnalysisException("AUTOCDC_MISSING_SOURCE", Map.empty)
+ }
+ if (!autoCdcDetails.hasSequenceBy) {
+ throw new AnalysisException("AUTOCDC_MISSING_SEQUENCE_BY", Map.empty)
+ }
+
+ val sourcePlan: LogicalPlan = UnresolvedRelation(
+ multipartIdentifier = GraphIdentifierManager
+ .parseTableIdentifier(name = autoCdcDetails.getSource, spark =
sessionHolder.session)
+ .nameParts,
+ isStreaming = true)
+
+ val toColumn: proto.Expression => Column = expr =>
Column(transformExpressionFunc(expr))
+
+ val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName =
expr =>
+ transformExpressionFunc(expr) match {
+ case a: UnresolvedAttribute => UnqualifiedColumnName(a.nameParts)
+ case other =>
+ throw new AnalysisException(
+ "AUTOCDC_NON_COLUMN_IDENTIFIER",
+ Map("expression" -> other.sql))
+ }
+
+ val keys =
autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName)
+
+ val columnSelection: Option[ColumnSelection] = {
+ val included = autoCdcDetails.getColumnListList.asScala.toSeq
+ val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq
+ if (included.nonEmpty && excluded.nonEmpty) {
+ throw new
AnalysisException("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", Map.empty)
+ } else if (included.nonEmpty) {
+
Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName)))
+ } else if (excluded.nonEmpty) {
+
Some(ColumnSelection.ExcludeColumns(excluded.map(asUnqualifiedColumnName)))
+ } else {
+ None
+ }
+ }
+
+ // Get user specified SCD type, or default to SCD1 if unspecified.
+ val scdType: ScdType = autoCdcDetails.getStoredAsScdType match {
+ case proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1 |
+ proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_UNSPECIFIED =>
+ ScdType.Type1
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported AutoCDC SCD
type: $other")
+ }
+
+ val changeArgs = ChangeArgs(
+ keys = keys,
+ sequencing = toColumn(autoCdcDetails.getSequenceBy),
+ storedAsScdType = scdType,
+ deleteCondition =
+
Option.when(autoCdcDetails.hasApplyAsDeletes)(toColumn(autoCdcDetails.getApplyAsDeletes)),
+ columnSelection = columnSelection)
+
+ AutoCdcFlow(
+ identifier = flowIdentifier,
+ destinationIdentifier = destinationIdentifier,
+ func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan),
+ sqlConf = flow.getSqlConfMap.asScala.toMap,
+ queryContext = QueryContext(Option(defaultCatalog),
Option(defaultDatabase)),
+ origin = flowOrigin(flow, flowIdentifier),
+ changeArgs = changeArgs)
+ }
+
+ private def flowOrigin(
+ flow: proto.PipelineCommand.DefineFlow,
+ flowIdentifier: TableIdentifier): QueryOrigin = QueryOrigin(
+ filePath =
+
Option.when(flow.getSourceCodeLocation.hasFileName)(flow.getSourceCodeLocation.getFileName),
+ line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
+ flow.getSourceCodeLocation.getLineNumber),
+ objectType = Some(QueryOriginType.Flow.toString),
+ objectName = Option(flowIdentifier.unquotedString),
+ language = Some(Python()))
+
private def startRun(
cmd: proto.PipelineCommand.StartRun,
responseObserver: StreamObserver[ExecutePlanResponse],
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index db78dc1744ec..c84eaadaa453 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2955,7 +2955,8 @@ class SparkConnectPlanner(
sessionHolder,
command,
responseObserver,
- transformRelation)
+ transformRelation,
+ transformExpression)
executeHolder.eventsManager.postFinished()
responseObserver.onNext(
proto.ExecutePlanResponse
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
index fd05b0cc357e..834e2d8144e1 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
@@ -29,15 +29,18 @@ import scala.util.Try
import org.scalactic.source.Position
import org.scalatest.Tag
+import org.apache.spark.SparkConf
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.ColumnConversions._
import org.apache.spark.sql.connect.PythonTestDepsChecker
import org.apache.spark.sql.connect.service.SparkConnectService
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, TableCatalog}
import org.apache.spark.sql.pipelines.Language.Python
+import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, ScdType,
UnqualifiedColumnName}
import org.apache.spark.sql.pipelines.common.FlowStatus
-import org.apache.spark.sql.pipelines.graph.{DataflowGraph,
PipelineUpdateContextImpl, QueryOrigin, QueryOriginType}
+import org.apache.spark.sql.pipelines.graph.{AutoCdcFlow, AutoCdcMergeFlow,
DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType}
import org.apache.spark.sql.pipelines.logging.EventLevel
import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers,
TestPipelineUpdateContextMixin}
import org.apache.spark.sql.types.StructType
@@ -51,10 +54,24 @@ class PythonPipelineSuite
with TestPipelineUpdateContextMixin
with EventVerificationTestHelpers {
- def buildGraph(pythonText: String): DataflowGraph = {
+ // Register a V2 in-memory catalog so AutoCDC tests can exercise
pipeline-default-catalog
+ // inheritance against a name that is never the session default
`spark_catalog`. The V2 in-memory
+ // catalog doesn't support streaming reads, but the AutoCDC tests that touch
it only run graph
+ // resolution -- not pipeline execution -- so this is sufficient.
+ override def sparkConf: SparkConf = super.sparkConf
+ .set("spark.sql.catalog.my_catalog", classOf[InMemoryTableCatalog].getName)
+
+ def buildGraph(
+ pythonText: String,
+ defaultCatalog: Option[String] = None,
+ defaultDatabase: Option[String] = None,
+ setupSql: Option[String] = None): DataflowGraph = {
val indentedPythonText = pythonText.linesIterator.map(" " +
_).mkString("\n")
// create a unique identifier to allow identifying the session and
dataflow graph
val customSessionIdentifier = UUID.randomUUID().toString
+ val defaultCatalogPyExpr = defaultCatalog.map(c =>
s""""$c"""").getOrElse("None")
+ val defaultDatabasePyExpr = defaultDatabase.map(d =>
s""""$d"""").getOrElse("None")
+ val setupSqlLine = setupSql.map(stmt =>
s"""spark.sql(\"\"\"$stmt\"\"\")""").getOrElse("")
val pythonCode =
s"""
|from pyspark.sql import SparkSession
@@ -76,10 +93,12 @@ class PythonPipelineSuite
| .config("spark.custom.identifier", "$customSessionIdentifier") \\
| .create()
|
+ |$setupSqlLine
+ |
|dataflow_graph_id = create_dataflow_graph(
| spark,
- | default_catalog=None,
- | default_database=None,
+ | default_catalog=$defaultCatalogPyExpr,
+ | default_database=$defaultDatabasePyExpr,
| sql_conf={},
|)
|
@@ -151,7 +170,7 @@ class PythonPipelineSuite
QueryOrigin(
language = Option(Python()),
filePath = Option("<string>"),
- line = Option(34),
+ line = Option(36),
objectName = Option("spark_catalog.default.table1"),
objectType = Option(QueryOriginType.Flow.toString))),
errorChecker = ex =>
@@ -203,7 +222,7 @@ class PythonPipelineSuite
QueryOrigin(
language = Option(Python()),
filePath = Option("<string>"),
- line = Option(40),
+ line = Option(42),
objectName = Option("spark_catalog.default.mv2"),
objectType = Option(QueryOriginType.Flow.toString))),
expectedEventLevel = EventLevel.INFO)
@@ -217,7 +236,7 @@ class PythonPipelineSuite
QueryOrigin(
language = Option(Python()),
filePath = Option("<string>"),
- line = Option(44),
+ line = Option(46),
objectName = Option("spark_catalog.default.mv"),
objectType = Option(QueryOriginType.Flow.toString))),
expectedEventLevel = EventLevel.INFO)
@@ -235,7 +254,7 @@ class PythonPipelineSuite
QueryOrigin(
language = Option(Python()),
filePath = Option("<string>"),
- line = Option(34),
+ line = Option(36),
objectName = Option("spark_catalog.default.table1"),
objectType = Option(QueryOriginType.Flow.toString))),
expectedEventLevel = EventLevel.INFO)
@@ -249,7 +268,7 @@ class PythonPipelineSuite
QueryOrigin(
language = Option(Python()),
filePath = Option("<string>"),
- line = Option(49),
+ line = Option(51),
objectName = Option("spark_catalog.default.standalone_flow1"),
objectType = Option(QueryOriginType.Flow.toString))),
expectedEventLevel = EventLevel.INFO)
@@ -935,6 +954,367 @@ class PythonPipelineSuite
assert(ex.getMessage.contains("table_with_wrong_struct_schema"))
}
+ private def buildAutoCdcFlow(pipelineSource: String): AutoCdcFlow = {
+ val graph = buildGraph(pipelineSource)
+ graph.flows
+ .collectFirst { case f: AutoCdcFlow => f }
+ .getOrElse(fail(s"Expected an AutoCdcFlow in the graph, got:
${graph.flows}"))
+ }
+
+ test("AutoCDC API: minimal flow registers an AutoCdcFlow with default name
and SCD1 default") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin)
+
+ assert(flow.identifier == graphIdentifier("target"))
+ assert(flow.destinationIdentifier == graphIdentifier("target"))
+ assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+ assert(flow.changeArgs.sequencing.expr.sql == "timestamp")
+ assert(flow.changeArgs.deleteCondition.isEmpty)
+ assert(flow.changeArgs.columnSelection.isEmpty)
+ assert(flow.changeArgs.storedAsScdType == ScdType.Type1)
+ }
+
+ test("AutoCDC API: composite keys are forwarded to ChangeArgs in order") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value", "timestamp"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin)
+
+ assert(
+ flow.changeArgs.keys ==
+ Seq(UnqualifiedColumnName("value"),
UnqualifiedColumnName("timestamp")))
+ }
+
+ test("AutoCDC API: apply_as_deletes is forwarded as a delete condition
column") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | apply_as_deletes = "value % 2 = 0",
+ |)
+ |""".stripMargin)
+
+ val deleteCondition = flow.changeArgs.deleteCondition.getOrElse(
+ fail("expected apply_as_deletes to populate deleteCondition"))
+ assert(deleteCondition.expr.sql.contains("value"))
+ assert(deleteCondition.expr.sql.contains("0"))
+ }
+
+ test("AutoCDC API: column_list is forwarded as IncludeColumns") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | column_list = ["value", "timestamp"],
+ |)
+ |""".stripMargin)
+
+ assert(
+ flow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("value"),
UnqualifiedColumnName("timestamp")))))
+ }
+
+ test("AutoCDC API: except_column_list is forwarded as ExcludeColumns") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | except_column_list = ["timestamp"],
+ |)
+ |""".stripMargin)
+
+ assert(
+ flow.changeArgs.columnSelection.contains(
+
ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("timestamp")))))
+ }
+
+ test("AutoCDC API: explicit `name` is honored as the flow identifier") {
+ val flow = buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | name = "my_flow",
+ |)
+ |""".stripMargin)
+
+ assert(flow.identifier == graphIdentifier("my_flow"))
+ assert(flow.destinationIdentifier == graphIdentifier("target"))
+ }
+
+ test("AutoCDC API: multi-part `keys` column is rejected at flow
registration") {
+ val ex = intercept[RuntimeException] {
+ buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["a.b"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin)
+ }
+ assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER"))
+ }
+
+ test("AutoCDC API: multi-part `column_list` entry is rejected at flow
registration") {
+ val ex = intercept[RuntimeException] {
+ buildAutoCdcFlow("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | column_list = ["nested.field"],
+ |)
+ |""".stripMargin)
+ }
+ assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER"))
+ }
+
+ test("AutoCDC API: Column-object form of keys/sequence_by/apply_as_deletes
is honored") {
+ val flow = buildAutoCdcFlow("""
+ |from pyspark.sql.functions import col, expr
+ |
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = [col("value")],
+ | sequence_by = col("timestamp"),
+ | apply_as_deletes = expr("value % 2 = 0"),
+ |)
+ |""".stripMargin)
+
+ assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+ assert(flow.changeArgs.sequencing.expr.sql == "timestamp")
+ val deleteCondition = flow.changeArgs.deleteCondition.getOrElse(
+ fail("expected apply_as_deletes to populate deleteCondition"))
+ assert(deleteCondition.expr.sql.contains("value"))
+ assert(deleteCondition.expr.sql.contains("0"))
+ }
+
+ test("AutoCDC API: graph resolves with the source streaming table as the
flow's input") {
+ val graph = buildGraph("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin).resolve()
+
+ val resolvedFlow = graph.resolvedFlow(graphIdentifier("target"))
+ assert(resolvedFlow.inputs == Set(graphIdentifier("src")))
+ }
+
+ test("AutoCDC API: single-part `source` inherits the pipeline's default
catalog and database") {
+ // Use `my_catalog` (registered in `sparkConf`) so the pipeline-default
catalog differs from
+ // the session default (`spark_catalog`), and a non-default namespace
`my_db` so the
+ // pipeline-default database differs from the session default (`default`).
The CREATE NAMESPACE
+ // runs on the same Connect session that subsequently creates the dataflow
graph, so the
+ // namespace is visible to that session's per-session V2 catalog instance.
+ val graph = buildGraph(
+ """
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin,
+ defaultCatalog = Some("my_catalog"),
+ defaultDatabase = Some("my_db"),
+ setupSql = Some("CREATE NAMESPACE IF NOT EXISTS
my_catalog.my_db")).resolve()
+
+ val resolvedFlow =
+ graph.resolvedFlow(TableIdentifier("target", Some("my_db"),
Some("my_catalog")))
+ assert(
+ resolvedFlow.inputs ==
+ Set(TableIdentifier("src", Some("my_db"), Some("my_catalog"))))
+ }
+
+ test("AutoCDC API: multi-part `source` resolves to the corresponding
qualified dataset") {
+ val graph = buildGraph("""
+ |@dp.table(name = "some_catalog.some_schema.src")
+ |def irrelevant():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table(name = "some_catalog.some_schema.target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "some_catalog.some_schema.target",
+ | source = "some_catalog.some_schema.src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin).resolve()
+
+ val targetIdent = TableIdentifier("target", Some("some_schema"),
Some("some_catalog"))
+ val srcIdent = TableIdentifier("src", Some("some_schema"),
Some("some_catalog"))
+ val resolvedFlow = graph.resolvedFlow(targetIdent)
+ assert(resolvedFlow.inputs == Set(srcIdent))
+ }
+
+ test("AutoCDC API: non-attribute expression in keys is rejected") {
+ val ex = intercept[RuntimeException] {
+ buildGraph("""
+ |from pyspark.sql.functions import expr
+ |
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = [expr("value + 1")],
+ | sequence_by = "timestamp",
+ |)
+ |""".stripMargin)
+ }
+ assert(ex.getMessage.contains("AUTOCDC_NON_COLUMN_IDENTIFIER"))
+ }
+
+ test("AutoCDC API: specifying both column_list and except_column_list is
rejected") {
+ // The Python create_auto_cdc_flow API does not currently enforce the "at
most one" contract
+ // client-side, so the proto carries both lists to the server, where the
structured error is
+ // raised. If/when a Python-side check is added, this test guards against
the server-side
+ // defense being silently bypassed.
+ val ex = intercept[RuntimeException] {
+ buildGraph("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | column_list = ["value"],
+ | except_column_list = ["timestamp"],
+ |)
+ |""".stripMargin)
+ }
+
assert(ex.getMessage.contains("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST"))
+ }
+
+ test("AutoCDC API: registered flow survives graph resolution and validation
end-to-end") {
+ val graph = buildGraph("""
+ |@dp.table
+ |def src():
+ | return spark.readStream.format("rate").load()
+ |
+ |dp.create_streaming_table("target")
+ |
+ |dp.create_auto_cdc_flow(
+ | target = "target",
+ | source = "src",
+ | keys = ["value"],
+ | sequence_by = "timestamp",
+ | apply_as_deletes = "value % 2 = 0",
+ | column_list = ["value", "timestamp"],
+ |)
+ |""".stripMargin).resolve().validate()
+
+ val resolvedFlow = graph.resolvedFlow(graphIdentifier("target"))
+ assert(resolvedFlow.isInstanceOf[AutoCdcMergeFlow])
+ val mergeFlow = resolvedFlow.asInstanceOf[AutoCdcMergeFlow]
+ assert(mergeFlow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+ assert(mergeFlow.changeArgs.sequencing.expr.sql == "timestamp")
+ assert(mergeFlow.changeArgs.deleteCondition.isDefined)
+ assert(
+
mergeFlow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("value"),
UnqualifiedColumnName("timestamp")))))
+ assert(mergeFlow.changeArgs.storedAsScdType == ScdType.Type1)
+ }
+
/**
* Executes Python code in a separate process and returns the exit code.
*
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index c475377ba506..49636acc1f8f 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -32,14 +32,16 @@ case class UnqualifiedColumnName private (name: String) {
}
object UnqualifiedColumnName {
- def apply(input: String): UnqualifiedColumnName = {
- val nameParts = CatalystSqlParser.parseMultipartIdentifier(input)
+ def apply(nameParts: Seq[String]): UnqualifiedColumnName = {
if (nameParts.length != 1) {
- throw multipartColumnIdentifierError(input, nameParts)
+ throw multipartColumnIdentifierError(nameParts.mkString("."), nameParts)
}
new UnqualifiedColumnName(nameParts.head)
}
+ def apply(input: String): UnqualifiedColumnName =
+ apply(CatalystSqlParser.parseMultipartIdentifier(input))
+
private def multipartColumnIdentifierError(
columnName: String,
nameParts: Seq[String]
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
index 1de2120a8f91..7be111003762 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
@@ -326,7 +326,7 @@ class ChangeArgsSuite extends SparkFunSuite with
SharedSparkSession {
UnqualifiedColumnName("a.b")
},
condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
- sqlState = "42703",
+ sqlState = "22023",
parameters = Map(
"columnName" -> "a.b",
"nameParts" -> "a, b"
@@ -340,7 +340,7 @@ class ChangeArgsSuite extends SparkFunSuite with
SharedSparkSession {
UnqualifiedColumnName("src.x")
},
condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
- sqlState = "42703",
+ sqlState = "22023",
parameters = Map(
"columnName" -> "src.x",
"nameParts" -> "src, x"
@@ -354,7 +354,7 @@ class ChangeArgsSuite extends SparkFunSuite with
SharedSparkSession {
UnqualifiedColumnName("a.b.c")
},
condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
- sqlState = "42703",
+ sqlState = "22023",
parameters = Map(
"columnName" -> "a.b.c",
"nameParts" -> "a, b, c"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]