This is an automated email from the ASF dual-hosted git repository.
szehon-ho pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new f56d2bf6bb56 [SPARK-56956][SPARK-56651][CONNECT][SDP][FOLLOWUP]
Address review comments for AutoCDC flow dataclasses and Python APIs
f56d2bf6bb56 is described below
commit f56d2bf6bb5602b5a4d97d5c7fa54fb26c6f5e91
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Jun 1 12:43:38 2026 -0700
[SPARK-56956][SPARK-56651][CONNECT][SDP][FOLLOWUP] Address review comments
for AutoCDC flow dataclasses and Python APIs
This follow-up PR addresses review comments left after #56042 (SPARK-56956,
AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.
### What changes were proposed in this pull request?
#### Scala — `Scd1BatchProcessor` / `Flow`
- Remove the now-dead
`Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call
site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT`
which the parent PR removed from `error-conditions.json`; the new
construction-time check in
`AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the
authoritative validator and supersedes it.
- Reorder `AutoCdcFlow`'s constructor so defaulted params trail the
non-defaulted ones (`origin`, `changeArgs`), allowing positional construction.
- Fix Scaladoc/comment text: factual wording for the keys-presence check,
the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was
`Scd1ForeachBatchExec`, which does not exist), and several minor
grammar/typography nits.
#### Python — `create_auto_cdc_flow` / `AutoCdcFlow`
- Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports
explaining the docs-build constraint (transitive grpc dependency missing from
the docs environment), so a future refactor doesn't hoist the imports and
silently break docs CI.
- Fix the `INVALID_MULTIPLE_ARGUMENT_CONDITIONS` error template
placeholder: `[{arg_names}]` → `[<arg_names>]`.
`ErrorClassesReader.get_error_message` extracts required placeholders via
`re.findall("<([a-zA-Z0-9_-]+)>", template)` and asserts the extracted set
equals `messageParameters.keys()`, so the curly-brace form would trip an
`AssertionError` instead of producing the intended `PySparkValueError`. The
typo also affected existing callers in `sql/session.py:2267` and `sql/connect/s
[...]
- Minor docstring fixes: "merge" operation (was "merged"), consistency
between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas,
"excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on
`create_streaming_table`.
### Why are the changes needed?
Cleanup of follow-up items identified during review of the parent PRs. The
dead Scala validator is the most material: if its code path were reached, it
would throw an internal `SparkException("Cannot find main error class ...")`
instead of a user-facing `AnalysisException`. The error-template typo would
surface as an `AssertionError` rather than a user-actionable error for the two
existing callers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`,
`ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to
cover the affected paths.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored by Claude.
Closes #56113 from cloud-fan/autocdc-flow-dataclasses-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Szehon Ho <[email protected]>
(cherry picked from commit 3e1650b9be0322daa98e7552c488cbbc75914edb)
Signed-off-by: Szehon Ho <[email protected]>
---
python/pyspark/errors/error-conditions.json | 2 +-
python/pyspark/pipelines/api.py | 20 +++++++------
python/pyspark/pipelines/flow.py | 5 ++--
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 24 +---------------
.../apache/spark/sql/pipelines/graph/Flow.scala | 33 +++++++++++-----------
.../sql/pipelines/graph/GraphValidations.scala | 3 +-
.../sql/pipelines/autocdc/AutoCdcFlowSuite.scala | 18 ++++--------
.../graph/ConnectInvalidPipelineSuite.scala | 2 +-
8 files changed, 42 insertions(+), 65 deletions(-)
diff --git a/python/pyspark/errors/error-conditions.json
b/python/pyspark/errors/error-conditions.json
index 7cc5a73e254b..38417cbf0188 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -417,7 +417,7 @@
},
"INVALID_MULTIPLE_ARGUMENT_CONDITIONS": {
"message": [
- "[{arg_names}] cannot be <condition>."
+ "[<arg_names>] cannot be <condition>."
]
},
"INVALID_NDARRAY_DIMENSION": {
diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py
index 084547f4c2b1..5cbc003708f2 100644
--- a/python/pyspark/pipelines/api.py
+++ b/python/pyspark/pipelines/api.py
@@ -541,8 +541,8 @@ def create_auto_cdc_flow(
) -> None:
"""
Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
- Target table must have already been created using create_streaming_table
function. Only one
- of column_list and except_column_list can be specified.
+ Target table must have already been created using the
`create_streaming_table` function.
+ Only one of column_list and except_column_list can be specified.
Example:
create_auto_cdc_flow(
@@ -576,16 +576,19 @@ def create_auto_cdc_flow(
:param column_list: Columns that will be included in the output table.
This should be a list \
of column identifiers without qualifiers, expressed as either Python
strings or PySpark \
Columns. Only one of column_list and except_column_list can be
specified.
- :param except_column_list: Columns that will be excluded in the output
table. This should be a \
- list of column identifiers without qualifiers, expressed as either
Python strings or \
+ :param except_column_list: Columns that will be excluded from the output
table. This should \
+ be a list of column identifiers without qualifiers, expressed as
either Python strings or \
PySpark Columns. Only one of column_list and except_column_list can be
specified. When \
- this is specified, all columns in the dataframe of the target table
except those in this \
- list will be in the output table.
+ this is specified, all columns in the `DataFrame` of the target table
except those in \
+ this list will be in the output table.
:param stored_as_scd_type: The SCD type for the target table. Only 1 (or
"1") is supported. \
- When not specified the server default applies.
- :param name: The name of the flow for this create_auto_cdc_flow command.
When unspecified \
+ When not specified, the server default applies.
+ :param name: The name of the flow for this create_auto_cdc_flow command.
When unspecified, \
this will build a "default flow" with name equal to the target name.
"""
+ # Lazy import: pyspark.sql.connect.functions.builtin transitively imports
grpc, which is
+ # not available in the docs-build environment. pyspark.pipelines.api is
loaded eagerly
+ # from pyspark.pipelines.__init__, so a top-level import here would break
docs CI.
from pyspark.sql.connect.functions.builtin import expr as _connect_expr
if type(target) is not str:
@@ -690,6 +693,7 @@ def _normalize_column_list(
arg_name: str,
column_list: Union[List[str], List[Column]],
) -> List[Column]:
+ # Lazy import: see comment in create_auto_cdc_flow.
from pyspark.sql.connect.functions.builtin import col as _connect_col
if not isinstance(column_list, list):
diff --git a/python/pyspark/pipelines/flow.py b/python/pyspark/pipelines/flow.py
index 02e971aedd87..b1922454a551 100644
--- a/python/pyspark/pipelines/flow.py
+++ b/python/pyspark/pipelines/flow.py
@@ -56,10 +56,11 @@ class AutoCdcFlow:
:param source: The name of the CDC source to stream from.
:param keys: Column(s) that uniquely identify a row in source and target
data.
:param sequence_by: Expression used to order the source data.
- :param apply_as_deletes: Optional delete condition for the merged
operation.
+ :param apply_as_deletes: Optional delete condition for the merge operation.
:param column_list: Optional columns to include in the output table.
:param except_column_list: Optional columns to exclude from the output
table.
- :param stored_as_scd_type: Optional SCD type for the target table. Only 1
is supported.
+ :param stored_as_scd_type: Optional SCD type for the target table. Only 1
(or "1") is \
+ supported.
:param source_code_location: The location of the source code that created
this flow.
"""
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 0035f442fb00..0656a7eb91b0 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.autocdc
import org.apache.spark.SparkException
-import org.apache.spark.sql.{functions => F, AnalysisException}
+import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
@@ -130,9 +130,6 @@ case class Scd1BatchProcessor(
*/
private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
validatedMicrobatch: DataFrame): DataFrame = {
- // Proactively validate the reserved CDC metadata column does not exist in
the microbatch.
- validateCdcMetadataColumnNotPresent(validatedMicrobatch)
-
val rowDeleteSequence: Column = changeArgs.deleteCondition match {
case Some(deleteCondition) =>
F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
@@ -409,25 +406,6 @@ case class Scd1BatchProcessor(
.insert(columnsToInsertOnNewKey)
.merge()
}
-
- private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit
= {
- val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
- val resolver = microbatchSqlConf.resolver
-
- microbatch.schema.fieldNames
- .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
- .foreach { conflictingColumnName =>
- throw new AnalysisException(
- errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
- messageParameters = Map(
- "caseSensitivity" ->
CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
- "columnName" -> conflictingColumnName,
- "schemaName" -> "microbatch",
- "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
- )
- )
- }
- }
}
object Scd1BatchProcessor {
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index 740533d7504e..f88b0cd3a1cb 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -137,10 +137,10 @@ sealed trait UnresolvedFlow extends Flow {
* An [[UnresolvedFlow]] whose execution-type has not yet been determined.
*
* In some cases, we know the execution-type for an [[UnresolvedFlow]] even
before flow analysis
- * and resolution. For example an AutoCDCFlow is a special
unresolved-but-typed flow; we know a
- * flow will be an AutoCDC flow immediately on construction, because it has
its own special
- * registration API. Such flows are considered "typed flows", but there isn't
any semantic reason
- * yet to explicitly introduce a `TypedFlow` trait/class.
+ * and resolution. For example, an [[AutoCdcFlow]] is a special
unresolved-but-typed flow; we
+ * know a flow will be an AutoCDC flow immediately on construction, because it
has its own
+ * special registration API. Such flows are considered "typed flows", but
there isn't any
+ * semantic reason yet to explicitly introduce a `TypedFlow` trait/class.
*/
case class UntypedFlow(
identifier: TableIdentifier,
@@ -161,17 +161,16 @@ case class UntypedFlow(
* [[AutoCdcFlow]] is a typed flow because it is only supported for streaming,
and not as a once
* flow. Therefore by definition it is a streaming-type flow.
*
- * In the future once-support for [[AutoCdcFlow]] may be added.
+ * In the future, support for once-mode [[AutoCdcFlow]] may be added.
*/
case class AutoCdcFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
- sqlConf: Map[String, String] = Map.empty,
- comment: Option[String] = None,
override val origin: QueryOrigin,
- changeArgs: ChangeArgs
+ changeArgs: ChangeArgs,
+ sqlConf: Map[String, String] = Map.empty
) extends UnresolvedFlow {
override val once: Boolean = false
@@ -245,8 +244,8 @@ class AppendOnceFlow(
}
/**
- * A resolved flow that applies a CDC event stream to a target table via
MERGE, in accordance to
- * the configured [[flow.changeArgs]].
+ * A resolved flow that applies a CDC event stream to a target table via
MERGE, in accordance
+ * with the configured [[flow.changeArgs]].
*/
class AutoCdcMergeFlow(
val flow: AutoCdcFlow,
@@ -264,8 +263,8 @@ class AutoCdcMergeFlow(
columnSelection = changeArgs.columnSelection,
caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
)
- // AutoCDC flows require all key columns to be present in the target
table, to adhere to SCD
- // semantics.
+ // AutoCDC flows require all key columns to be present in the
user-selected source schema,
+ // so that they survive into the target table where SCD reconciliation
needs them.
requireKeysPresentInSelectedSchema(selectedSchema)
selectedSchema
}
@@ -305,11 +304,11 @@ class AutoCdcMergeFlow(
* Returns an empty dataframe whose schema matches
[[AutoCdcMergeFlow.schema]]. By construction,
* the returned dataframe will be a streaming dataframe.
*
- * In practice, [[AutoCdcMergeFlow.load]] is not invoked during graph
analysis or execution.
- * An AutoCdcMergeFlow can only be an input to a streaming table (not an MV
or
- * persisted/temp view), and streaming tables consume a
[[VirtualTableInput]] rather than the
- * producing [[Flow]] directly. [[VirtualTableInput]] overrides its own
[[load]] to do schema
- * inference on its input flows, rather than a transitive [[Flow.load]].
+ * Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph
analysis or
+ * execution. An AutoCdcMergeFlow can only be an input to a streaming table
(not an MV or
+ * persisted/temp view), and streaming tables take a [[VirtualTableInput]]
as input, not
+ * the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own
[[load]] to do
+ * schema inference on its input flows, rather than a transitive
[[ResolvedFlow.load]].
*
* The implementation exists for API consistency and throws an internal
error if invoked with
* `asStreaming = false`, or if the underlying source dataframe is not
streaming, to surface
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
index a80fdafd1c18..d56b95b5830b 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
@@ -35,7 +35,8 @@ trait GraphValidations extends Logging {
protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier,
Seq[Flow]] = {
val multiQueryTables = flowsTo.filter(_._2.size > 1)
- // A multiflow table may not have an AutoCDC flow; AutoCDC flow targets
must be single query.
+ // A multiflow table may not have an AutoCDC flow; AutoCDC targets must
have exactly one
+ // input flow.
multiQueryTables
.find { case (_, flows) => flows.exists(isAutoCdcFlow) }
.foreach {
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index 932110b94afd..cf7c9533bee9 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -74,7 +74,6 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
func: FlowFunction = noOpFlowFunction,
queryContext: QueryContext = testQueryContext,
sqlConf: Map[String, String] = Map.empty,
- comment: Option[String] = None,
origin: QueryOrigin = QueryOrigin.empty,
changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
AutoCdcFlow(
@@ -83,7 +82,6 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
func = func,
queryContext = queryContext,
sqlConf = sqlConf,
- comment = comment,
origin = origin,
changeArgs = changeArgs
)
@@ -91,8 +89,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
test("AutoCdcFlow exposes its constructor fields") {
val flow = newAutoCdcFlow(
- sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
- comment = Some("my CDC flow")
+ sqlConf = Map("spark.sql.shuffle.partitions" -> "8")
)
assert(flow.identifier == testIdentifier)
@@ -100,12 +97,11 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
assert(flow.func eq noOpFlowFunction)
assert(flow.queryContext == testQueryContext)
assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
- assert(flow.comment.contains("my CDC flow"))
assert(flow.origin == QueryOrigin.empty)
assert(flow.changeArgs == testChangeArgs)
}
- test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+ test("AutoCdcFlow defaults sqlConf to empty") {
// Confirms the case-class default values match the documented contract;
downstream
// registration code relies on `sqlConf` being a non-null empty map by
default so that
// `defaultSqlConf ++ flowDef.sqlConf` is well-defined in
[[GraphRegistrationContext]].
@@ -119,7 +115,6 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
)
assert(flow.sqlConf.isEmpty)
- assert(flow.comment.isEmpty)
}
test("AutoCdcFlow.once is always false") {
@@ -143,7 +138,6 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
assert(updated.destinationIdentifier == original.destinationIdentifier)
assert(updated.func eq original.func)
assert(updated.queryContext == original.queryContext)
- assert(updated.comment == original.comment)
assert(updated.origin == original.origin)
assert(updated.changeArgs == original.changeArgs)
// The original must not be mutated.
@@ -165,7 +159,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
sqlConf = Map.empty
)
- /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change
args. */
+ /** Builds an [[AutoCdcMergeFlow]] over the given source dataframe + change
args. */
private def newAutoCdcMergeFlow(
sourceDf: DataFrame,
keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
@@ -445,9 +439,9 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
"AutoCdcMergeFlow rejects a source df column whose name equals the
reserved CDC " +
"metadata column"
) {
- // Locks in the previous engine-level guard
(Scd1BatchProcessor.extendMicrobatchRowsWith
- // CdcMetadata) at flow-construction time. Any future regression where a
user-supplied
- // CDC stream carries the reserved metadata column name should fail
eagerly here.
+ // Locks in the previous engine-level guard at flow-construction time. Any
future
+ // regression where a user-supplied CDC stream carries the reserved
metadata column name
+ // should fail eagerly here.
val sourceDf =
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
checkError(
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index 8dad5019c0fe..6eda2afdcdb8 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -658,7 +658,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with
SharedSparkSession {
test(
"AutoCDC flow targeting a temporary view fails with
AUTOCDC_RELATION_FOR_TEMPORARY_VIEW"
) {
- // Temporary views in SDP normally accept either streaming or batch
producing flows, but
+ // Temporary views in SDP normally accept either streaming or
batch-producing flows, but
// AutoCDC flows are an explicit exception: SCD reconciliation only runs
at the
// streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an
AutoCDC flow at a view
// would silently drop reconciliation and expose just the projected CDF to
consumers.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]