This is an automated email from the ASF dual-hosted git repository.
gaogaotiantian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fe02b31e5bd0 [SPARK-56651][CONNECT][SDP][FOLLOWUP] Move Auto CDC flow
tests into a dedicated test file
fe02b31e5bd0 is described below
commit fe02b31e5bd0028459032173ae3eda8aab443d91
Author: Tian Gao <[email protected]>
AuthorDate: Tue Jun 2 10:47:23 2026 -0700
[SPARK-56651][CONNECT][SDP][FOLLOWUP] Move Auto CDC flow tests into a
dedicated test file
### What changes were proposed in this pull request?
Move auto cdc related tests to a new file.
### Why are the changes needed?
1. auto cdc is its own feature and should not be tested in
graph_element_registry
2. auto cdc requires connect and we need to guard the test case
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The test passed locally.
### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude Code (Opus 4.8)
Closes #56259 from gaogaotiantian/SPARK-extract-autocdc-tests.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Tian Gao <[email protected]>
---
dev/sparktestsupport/modules.py | 1 +
...h_element_registry.py => test_auto_cdc_flow.py} | 130 +++--------------
.../pipelines/tests/test_graph_element_registry.py | 159 +--------------------
3 files changed, 19 insertions(+), 271 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 7a4e50ff4141..d914f04b5027 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1590,6 +1590,7 @@ pyspark_pipelines = Module(
source_file_regexes=["python/pyspark/pipelines"],
python_test_goals=[
"pyspark.pipelines.tests.test_add_pipeline_analysis_context",
+ "pyspark.pipelines.tests.test_auto_cdc_flow",
"pyspark.pipelines.tests.test_block_session_mutations",
"pyspark.pipelines.tests.test_cli",
"pyspark.pipelines.tests.test_decorators",
diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py
b/python/pyspark/pipelines/tests/test_auto_cdc_flow.py
similarity index 61%
copy from python/pyspark/pipelines/tests/test_graph_element_registry.py
copy to python/pyspark/pipelines/tests/test_auto_cdc_flow.py
index fd8ed439b130..c52e8a09e8a6 100644
--- a/python/pyspark/pipelines/tests/test_graph_element_registry.py
+++ b/python/pyspark/pipelines/tests/test_auto_cdc_flow.py
@@ -16,90 +16,25 @@
#
import unittest
-
-from pyspark.errors import PySparkException, PySparkTypeError
-from pyspark.pipelines.graph_element_registry import
graph_element_registration_context
-from pyspark import pipelines as dp
-from pyspark.pipelines.flow import AutoCdcFlow
-from pyspark.pipelines.output import Sink
-from pyspark.pipelines.tests.local_graph_element_registry import
LocalGraphElementRegistry
-from pyspark.sql import Column
-from pyspark.sql.connect.functions.builtin import col, expr
from typing import cast
+from pyspark.errors import PySparkRuntimeError, PySparkTypeError
+from pyspark.sql import Column
+from pyspark.testing.connectutils import (
+ should_test_connect,
+ connect_requirement_message,
+)
-class GraphElementRegistryTest(unittest.TestCase):
- def test_graph_element_registry(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
-
- @dp.materialized_view
- def mv():
- raise NotImplementedError()
-
- @dp.table
- def st():
- raise NotImplementedError()
-
- dp.create_streaming_table("st2")
-
- @dp.append_flow(target="st2")
- def flow1():
- raise NotImplementedError()
-
- @dp.append_flow(target="st2")
- def flow2():
- raise NotImplementedError()
-
- dp.create_sink(
- name="sink",
- format="parquet",
- options={
- "key1": "value1",
- },
- )
-
- self.assertEqual(len(registry.outputs), 4)
- self.assertEqual(len(registry.flows), 4)
-
- mv_obj = registry.outputs[0]
- self.assertEqual(mv_obj.name, "mv")
- assert
mv_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- mv_flow_obj = registry.flows[0]
- self.assertEqual(mv_flow_obj.name, "mv")
- self.assertEqual(mv_flow_obj.target, "mv")
- assert
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- st_obj = registry.outputs[1]
- self.assertEqual(st_obj.name, "st")
- assert
st_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- st_flow_obj = registry.flows[1]
- self.assertEqual(st_flow_obj.name, "st")
- self.assertEqual(st_flow_obj.target, "st")
- assert
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- st2_obj = registry.outputs[2]
- self.assertEqual(st2_obj.name, "st2")
- assert
st2_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- st2_flow1_obj = registry.flows[2]
- self.assertEqual(st2_flow1_obj.name, "flow1")
- self.assertEqual(st2_flow1_obj.target, "st2")
- assert
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- st2_flow1_obj = registry.flows[3]
- self.assertEqual(st2_flow1_obj.name, "flow2")
- self.assertEqual(st2_flow1_obj.target, "st2")
- assert
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+if should_test_connect:
+ from pyspark import pipelines as dp
+ from pyspark.pipelines.graph_element_registry import
graph_element_registration_context
+ from pyspark.pipelines.flow import AutoCdcFlow
+ from pyspark.pipelines.tests.local_graph_element_registry import
LocalGraphElementRegistry
+ from pyspark.sql.connect.functions.builtin import col, expr
- sink_obj = cast(Sink, registry.outputs[3])
- self.assertEqual(sink_obj.name, "sink")
- self.assertEqual(sink_obj.format, "parquet")
- self.assertEqual(sink_obj.options["key1"], "value1")
- assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
[email protected](not should_test_connect, connect_requirement_message)
+class AutoCdcFlowConstructionTest(unittest.TestCase):
def test_create_auto_cdc_flow(self):
registry = LocalGraphElementRegistry()
with graph_element_registration_context(registry):
@@ -122,7 +57,7 @@ class GraphElementRegistryTest(unittest.TestCase):
self.assertEqual(flow.name, "target")
self.assertIsNone(flow.stored_as_scd_type)
self.assertIsNone(flow.apply_as_deletes)
- assert
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+ assert
flow.source_code_location.filename.endswith("test_auto_cdc_flow.py")
def test_create_auto_cdc_flow_with_all_args(self):
registry = LocalGraphElementRegistry()
@@ -241,39 +176,8 @@ class GraphElementRegistryTest(unittest.TestCase):
)
self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
- def test_definition_without_graph_element_registry(self):
- for decorator in [dp.table, dp.temporary_view, dp.materialized_view]:
- with self.assertRaises(PySparkException) as context:
-
- @decorator
- def a():
- raise NotImplementedError()
-
- self.assertEqual(
- context.exception.getCondition(),
- "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
- )
-
- with self.assertRaises(PySparkException) as context:
- dp.create_streaming_table("st")
-
- self.assertEqual(
- context.exception.getCondition(),
- "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
- )
-
- with self.assertRaises(PySparkException) as context:
-
- @dp.append_flow(target="st")
- def b():
- raise NotImplementedError()
-
- self.assertEqual(
- context.exception.getCondition(),
- "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
- )
-
- with self.assertRaises(PySparkException) as context:
+ def test_create_auto_cdc_flow_without_registry(self):
+ with self.assertRaises(PySparkRuntimeError) as context:
dp.create_auto_cdc_flow(
target="t",
source="s",
diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py
b/python/pyspark/pipelines/tests/test_graph_element_registry.py
index fd8ed439b130..1e6fcf224a0a 100644
--- a/python/pyspark/pipelines/tests/test_graph_element_registry.py
+++ b/python/pyspark/pipelines/tests/test_graph_element_registry.py
@@ -17,14 +17,11 @@
import unittest
-from pyspark.errors import PySparkException, PySparkTypeError
+from pyspark.errors import PySparkException
from pyspark.pipelines.graph_element_registry import
graph_element_registration_context
from pyspark import pipelines as dp
-from pyspark.pipelines.flow import AutoCdcFlow
from pyspark.pipelines.output import Sink
from pyspark.pipelines.tests.local_graph_element_registry import
LocalGraphElementRegistry
-from pyspark.sql import Column
-from pyspark.sql.connect.functions.builtin import col, expr
from typing import cast
@@ -100,147 +97,6 @@ class GraphElementRegistryTest(unittest.TestCase):
self.assertEqual(sink_obj.options["key1"], "value1")
assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
- def test_create_auto_cdc_flow(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("target")
- dp.create_auto_cdc_flow(
- target="target",
- source="source",
- keys=[col("key")],
- sequence_by=expr("seq"),
- )
-
- self.assertEqual(len(registry.outputs), 1)
- self.assertEqual(len(registry.auto_cdc_flows), 1)
-
- flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
- self.assertEqual(flow.target, "target")
- self.assertEqual(flow.source, "source")
-
- # When name is not specified, it inherits the target's name at
construction time.
- self.assertEqual(flow.name, "target")
- self.assertIsNone(flow.stored_as_scd_type)
- self.assertIsNone(flow.apply_as_deletes)
- assert
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
-
- def test_create_auto_cdc_flow_with_all_args(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("tgt")
- dp.create_auto_cdc_flow(
- target="tgt",
- source="src",
- keys=[col("id")],
- sequence_by=expr("ts"),
- apply_as_deletes=expr("op = 'DELETE'"),
- column_list=[col("id"), col("val")],
- stored_as_scd_type=1,
- name="my_flow",
- )
-
- flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
- self.assertEqual(flow.name, "my_flow")
- self.assertEqual(flow.stored_as_scd_type, 1)
-
- def test_create_auto_cdc_flow_with_string_args(self):
- # Verify that string forms of column / expression arguments are
normalized to
- # PySpark Columns, equivalent to passing col(...) / expr(...) directly.
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("tgt")
- dp.create_auto_cdc_flow(
- target="tgt",
- source="src",
- keys=["id"],
- sequence_by="ts",
- apply_as_deletes="op = 'DELETE'",
- column_list=["id", "val"],
- )
-
- flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
- for k in flow.keys:
- self.assertIsInstance(k, Column)
- self.assertIsInstance(flow.sequence_by, Column)
- self.assertIsInstance(flow.apply_as_deletes, Column)
- assert flow.column_list is not None
- for c in flow.column_list:
- self.assertIsInstance(c, Column)
-
- def test_create_auto_cdc_flow_stored_as_scd_type_string(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("t")
- dp.create_auto_cdc_flow(
- target="t",
- source="s",
- keys=[col("k")],
- sequence_by=expr("seq"),
- stored_as_scd_type="1",
- )
-
- flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
- self.assertEqual(flow.stored_as_scd_type, "1")
-
- def test_create_auto_cdc_flow_invalid_scd_type(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("t")
- with self.assertRaises(PySparkTypeError) as ctx:
- dp.create_auto_cdc_flow(
- target="t",
- source="s",
- keys=[col("k")],
- sequence_by=expr("seq"),
- stored_as_scd_type=2, # type: ignore[arg-type]
- )
- self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
- def test_create_auto_cdc_flow_with_except_column_list(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("tgt")
- dp.create_auto_cdc_flow(
- target="tgt",
- source="src",
- keys=[col("id")],
- sequence_by=expr("ts"),
- except_column_list=["op", "ts"],
- )
-
- flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
- self.assertIsNone(flow.column_list)
- assert flow.except_column_list is not None
- self.assertEqual(len(flow.except_column_list), 2)
- for c in flow.except_column_list:
- self.assertIsInstance(c, Column)
-
- def test_create_auto_cdc_flow_rejects_non_str_target(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("tgt")
- with self.assertRaises(PySparkTypeError) as ctx:
- dp.create_auto_cdc_flow(
- target=123, # type: ignore[arg-type]
- source="src",
- keys=[col("id")],
- sequence_by=expr("ts"),
- )
- self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
- def test_create_auto_cdc_flow_rejects_invalid_key_element(self):
- registry = LocalGraphElementRegistry()
- with graph_element_registration_context(registry):
- dp.create_streaming_table("tgt")
- with self.assertRaises(PySparkTypeError) as ctx:
- dp.create_auto_cdc_flow(
- target="tgt",
- source="src",
- keys=[123], # type: ignore[list-item]
- sequence_by=expr("ts"),
- )
- self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
def test_definition_without_graph_element_registry(self):
for decorator in [dp.table, dp.temporary_view, dp.materialized_view]:
with self.assertRaises(PySparkException) as context:
@@ -273,19 +129,6 @@ class GraphElementRegistryTest(unittest.TestCase):
"GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
)
- with self.assertRaises(PySparkException) as context:
- dp.create_auto_cdc_flow(
- target="t",
- source="s",
- keys=["k"],
- sequence_by="seq",
- )
-
- self.assertEqual(
- context.exception.getCondition(),
- "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
- )
-
if __name__ == "__main__":
from pyspark.testing import main
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]