This is an automated email from the ASF dual-hosted git repository.

gaogaotiantian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe02b31e5bd0 [SPARK-56651][CONNECT][SDP][FOLLOWUP] Move Auto CDC flow 
tests into a dedicated test file
fe02b31e5bd0 is described below

commit fe02b31e5bd0028459032173ae3eda8aab443d91
Author: Tian Gao <[email protected]>
AuthorDate: Tue Jun 2 10:47:23 2026 -0700

    [SPARK-56651][CONNECT][SDP][FOLLOWUP] Move Auto CDC flow tests into a 
dedicated test file
    
    ### What changes were proposed in this pull request?
    
    Move auto cdc related tests to a new file.
    
    ### Why are the changes needed?
    
    1. auto cdc is its own feature and should not be tested in 
graph_element_registry
    2. auto cdc requires connect and we need to guard the test case
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The test passed locally.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, Claude Code (Opus 4.8)
    
    Closes #56259 from gaogaotiantian/SPARK-extract-autocdc-tests.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Tian Gao <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   1 +
 ...h_element_registry.py => test_auto_cdc_flow.py} | 130 +++--------------
 .../pipelines/tests/test_graph_element_registry.py | 159 +--------------------
 3 files changed, 19 insertions(+), 271 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 7a4e50ff4141..d914f04b5027 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1590,6 +1590,7 @@ pyspark_pipelines = Module(
     source_file_regexes=["python/pyspark/pipelines"],
     python_test_goals=[
         "pyspark.pipelines.tests.test_add_pipeline_analysis_context",
+        "pyspark.pipelines.tests.test_auto_cdc_flow",
         "pyspark.pipelines.tests.test_block_session_mutations",
         "pyspark.pipelines.tests.test_cli",
         "pyspark.pipelines.tests.test_decorators",
diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py 
b/python/pyspark/pipelines/tests/test_auto_cdc_flow.py
similarity index 61%
copy from python/pyspark/pipelines/tests/test_graph_element_registry.py
copy to python/pyspark/pipelines/tests/test_auto_cdc_flow.py
index fd8ed439b130..c52e8a09e8a6 100644
--- a/python/pyspark/pipelines/tests/test_graph_element_registry.py
+++ b/python/pyspark/pipelines/tests/test_auto_cdc_flow.py
@@ -16,90 +16,25 @@
 #
 
 import unittest
-
-from pyspark.errors import PySparkException, PySparkTypeError
-from pyspark.pipelines.graph_element_registry import 
graph_element_registration_context
-from pyspark import pipelines as dp
-from pyspark.pipelines.flow import AutoCdcFlow
-from pyspark.pipelines.output import Sink
-from pyspark.pipelines.tests.local_graph_element_registry import 
LocalGraphElementRegistry
-from pyspark.sql import Column
-from pyspark.sql.connect.functions.builtin import col, expr
 from typing import cast
 
+from pyspark.errors import PySparkRuntimeError, PySparkTypeError
+from pyspark.sql import Column
+from pyspark.testing.connectutils import (
+    should_test_connect,
+    connect_requirement_message,
+)
 
-class GraphElementRegistryTest(unittest.TestCase):
-    def test_graph_element_registry(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-
-            @dp.materialized_view
-            def mv():
-                raise NotImplementedError()
-
-            @dp.table
-            def st():
-                raise NotImplementedError()
-
-            dp.create_streaming_table("st2")
-
-            @dp.append_flow(target="st2")
-            def flow1():
-                raise NotImplementedError()
-
-            @dp.append_flow(target="st2")
-            def flow2():
-                raise NotImplementedError()
-
-            dp.create_sink(
-                name="sink",
-                format="parquet",
-                options={
-                    "key1": "value1",
-                },
-            )
-
-        self.assertEqual(len(registry.outputs), 4)
-        self.assertEqual(len(registry.flows), 4)
-
-        mv_obj = registry.outputs[0]
-        self.assertEqual(mv_obj.name, "mv")
-        assert 
mv_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        mv_flow_obj = registry.flows[0]
-        self.assertEqual(mv_flow_obj.name, "mv")
-        self.assertEqual(mv_flow_obj.target, "mv")
-        assert 
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        st_obj = registry.outputs[1]
-        self.assertEqual(st_obj.name, "st")
-        assert 
st_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        st_flow_obj = registry.flows[1]
-        self.assertEqual(st_flow_obj.name, "st")
-        self.assertEqual(st_flow_obj.target, "st")
-        assert 
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        st2_obj = registry.outputs[2]
-        self.assertEqual(st2_obj.name, "st2")
-        assert 
st2_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        st2_flow1_obj = registry.flows[2]
-        self.assertEqual(st2_flow1_obj.name, "flow1")
-        self.assertEqual(st2_flow1_obj.target, "st2")
-        assert 
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-        st2_flow1_obj = registry.flows[3]
-        self.assertEqual(st2_flow1_obj.name, "flow2")
-        self.assertEqual(st2_flow1_obj.target, "st2")
-        assert 
mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+if should_test_connect:
+    from pyspark import pipelines as dp
+    from pyspark.pipelines.graph_element_registry import 
graph_element_registration_context
+    from pyspark.pipelines.flow import AutoCdcFlow
+    from pyspark.pipelines.tests.local_graph_element_registry import 
LocalGraphElementRegistry
+    from pyspark.sql.connect.functions.builtin import col, expr
 
-        sink_obj = cast(Sink, registry.outputs[3])
-        self.assertEqual(sink_obj.name, "sink")
-        self.assertEqual(sink_obj.format, "parquet")
-        self.assertEqual(sink_obj.options["key1"], "value1")
-        assert 
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
 
[email protected](not should_test_connect, connect_requirement_message)
+class AutoCdcFlowConstructionTest(unittest.TestCase):
     def test_create_auto_cdc_flow(self):
         registry = LocalGraphElementRegistry()
         with graph_element_registration_context(registry):
@@ -122,7 +57,7 @@ class GraphElementRegistryTest(unittest.TestCase):
         self.assertEqual(flow.name, "target")
         self.assertIsNone(flow.stored_as_scd_type)
         self.assertIsNone(flow.apply_as_deletes)
-        assert 
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+        assert 
flow.source_code_location.filename.endswith("test_auto_cdc_flow.py")
 
     def test_create_auto_cdc_flow_with_all_args(self):
         registry = LocalGraphElementRegistry()
@@ -241,39 +176,8 @@ class GraphElementRegistryTest(unittest.TestCase):
                 )
             self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
 
-    def test_definition_without_graph_element_registry(self):
-        for decorator in [dp.table, dp.temporary_view, dp.materialized_view]:
-            with self.assertRaises(PySparkException) as context:
-
-                @decorator
-                def a():
-                    raise NotImplementedError()
-
-            self.assertEqual(
-                context.exception.getCondition(),
-                "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
-            )
-
-        with self.assertRaises(PySparkException) as context:
-            dp.create_streaming_table("st")
-
-        self.assertEqual(
-            context.exception.getCondition(),
-            "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
-        )
-
-        with self.assertRaises(PySparkException) as context:
-
-            @dp.append_flow(target="st")
-            def b():
-                raise NotImplementedError()
-
-        self.assertEqual(
-            context.exception.getCondition(),
-            "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
-        )
-
-        with self.assertRaises(PySparkException) as context:
+    def test_create_auto_cdc_flow_without_registry(self):
+        with self.assertRaises(PySparkRuntimeError) as context:
             dp.create_auto_cdc_flow(
                 target="t",
                 source="s",
diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py 
b/python/pyspark/pipelines/tests/test_graph_element_registry.py
index fd8ed439b130..1e6fcf224a0a 100644
--- a/python/pyspark/pipelines/tests/test_graph_element_registry.py
+++ b/python/pyspark/pipelines/tests/test_graph_element_registry.py
@@ -17,14 +17,11 @@
 
 import unittest
 
-from pyspark.errors import PySparkException, PySparkTypeError
+from pyspark.errors import PySparkException
 from pyspark.pipelines.graph_element_registry import 
graph_element_registration_context
 from pyspark import pipelines as dp
-from pyspark.pipelines.flow import AutoCdcFlow
 from pyspark.pipelines.output import Sink
 from pyspark.pipelines.tests.local_graph_element_registry import 
LocalGraphElementRegistry
-from pyspark.sql import Column
-from pyspark.sql.connect.functions.builtin import col, expr
 from typing import cast
 
 
@@ -100,147 +97,6 @@ class GraphElementRegistryTest(unittest.TestCase):
         self.assertEqual(sink_obj.options["key1"], "value1")
         assert 
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
 
-    def test_create_auto_cdc_flow(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("target")
-            dp.create_auto_cdc_flow(
-                target="target",
-                source="source",
-                keys=[col("key")],
-                sequence_by=expr("seq"),
-            )
-
-        self.assertEqual(len(registry.outputs), 1)
-        self.assertEqual(len(registry.auto_cdc_flows), 1)
-
-        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
-        self.assertEqual(flow.target, "target")
-        self.assertEqual(flow.source, "source")
-
-        # When name is not specified, it inherits the target's name at 
construction time.
-        self.assertEqual(flow.name, "target")
-        self.assertIsNone(flow.stored_as_scd_type)
-        self.assertIsNone(flow.apply_as_deletes)
-        assert 
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
-
-    def test_create_auto_cdc_flow_with_all_args(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("tgt")
-            dp.create_auto_cdc_flow(
-                target="tgt",
-                source="src",
-                keys=[col("id")],
-                sequence_by=expr("ts"),
-                apply_as_deletes=expr("op = 'DELETE'"),
-                column_list=[col("id"), col("val")],
-                stored_as_scd_type=1,
-                name="my_flow",
-            )
-
-        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
-        self.assertEqual(flow.name, "my_flow")
-        self.assertEqual(flow.stored_as_scd_type, 1)
-
-    def test_create_auto_cdc_flow_with_string_args(self):
-        # Verify that string forms of column / expression arguments are 
normalized to
-        # PySpark Columns, equivalent to passing col(...) / expr(...) directly.
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("tgt")
-            dp.create_auto_cdc_flow(
-                target="tgt",
-                source="src",
-                keys=["id"],
-                sequence_by="ts",
-                apply_as_deletes="op = 'DELETE'",
-                column_list=["id", "val"],
-            )
-
-        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
-        for k in flow.keys:
-            self.assertIsInstance(k, Column)
-        self.assertIsInstance(flow.sequence_by, Column)
-        self.assertIsInstance(flow.apply_as_deletes, Column)
-        assert flow.column_list is not None
-        for c in flow.column_list:
-            self.assertIsInstance(c, Column)
-
-    def test_create_auto_cdc_flow_stored_as_scd_type_string(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("t")
-            dp.create_auto_cdc_flow(
-                target="t",
-                source="s",
-                keys=[col("k")],
-                sequence_by=expr("seq"),
-                stored_as_scd_type="1",
-            )
-
-        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
-        self.assertEqual(flow.stored_as_scd_type, "1")
-
-    def test_create_auto_cdc_flow_invalid_scd_type(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("t")
-            with self.assertRaises(PySparkTypeError) as ctx:
-                dp.create_auto_cdc_flow(
-                    target="t",
-                    source="s",
-                    keys=[col("k")],
-                    sequence_by=expr("seq"),
-                    stored_as_scd_type=2,  # type: ignore[arg-type]
-                )
-            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
-    def test_create_auto_cdc_flow_with_except_column_list(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("tgt")
-            dp.create_auto_cdc_flow(
-                target="tgt",
-                source="src",
-                keys=[col("id")],
-                sequence_by=expr("ts"),
-                except_column_list=["op", "ts"],
-            )
-
-        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
-        self.assertIsNone(flow.column_list)
-        assert flow.except_column_list is not None
-        self.assertEqual(len(flow.except_column_list), 2)
-        for c in flow.except_column_list:
-            self.assertIsInstance(c, Column)
-
-    def test_create_auto_cdc_flow_rejects_non_str_target(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("tgt")
-            with self.assertRaises(PySparkTypeError) as ctx:
-                dp.create_auto_cdc_flow(
-                    target=123,  # type: ignore[arg-type]
-                    source="src",
-                    keys=[col("id")],
-                    sequence_by=expr("ts"),
-                )
-            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
-    def test_create_auto_cdc_flow_rejects_invalid_key_element(self):
-        registry = LocalGraphElementRegistry()
-        with graph_element_registration_context(registry):
-            dp.create_streaming_table("tgt")
-            with self.assertRaises(PySparkTypeError) as ctx:
-                dp.create_auto_cdc_flow(
-                    target="tgt",
-                    source="src",
-                    keys=[123],  # type: ignore[list-item]
-                    sequence_by=expr("ts"),
-                )
-            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
-
     def test_definition_without_graph_element_registry(self):
         for decorator in [dp.table, dp.temporary_view, dp.materialized_view]:
             with self.assertRaises(PySparkException) as context:
@@ -273,19 +129,6 @@ class GraphElementRegistryTest(unittest.TestCase):
             "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
         )
 
-        with self.assertRaises(PySparkException) as context:
-            dp.create_auto_cdc_flow(
-                target="t",
-                source="s",
-                keys=["k"],
-                sequence_by="seq",
-            )
-
-        self.assertEqual(
-            context.exception.getCondition(),
-            "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
-        )
-
 
 if __name__ == "__main__":
     from pyspark.testing import main


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to