This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 65336a0a1769 [SPARK-53810][SS][TESTS] Split large TWS python tests 
into multiple small tests to speedup the CI
65336a0a1769 is described below

commit 65336a0a17695b2b67082431efdd2ea7e179962e
Author: huanliwang-db <[email protected]>
AuthorDate: Wed Oct 8 10:38:28 2025 -0700

    [SPARK-53810][SS][TESTS] Split large TWS python tests into multiple small 
tests to speedup the CI
    
    ### What changes were proposed in this pull request?
    
    Split large TWS python tests into multiple small tests to speedup the CI
    
    ### Why are the changes needed?
    
    CI is slow now, splitting tests can help speed it up and it's easier for 
debug
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    test passes
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #52531 from huanliwang-db/huanliwang-db/split-test.
    
    Authored-by: huanliwang-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 dev/sparktestsupport/modules.py                    |  2 +
 .../test_parity_pandas_transform_with_state.py     | 31 -----------
 .../test_parity_transform_with_state_pyspark.py}   | 33 +----------
 .../pandas/test_pandas_transform_with_state.py     | 28 ----------
 ...st_pandas_transform_with_state_checkpoint_v2.py | 64 ++++++++++++++++++++++
 5 files changed, 67 insertions(+), 91 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index bc30c7740e39..2397bc9c7962 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -565,6 +565,7 @@ pyspark_sql = Module(
         "pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
         "pyspark.sql.tests.pandas.test_pandas_map",
         "pyspark.sql.tests.pandas.test_pandas_transform_with_state",
+        
"pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2",
         "pyspark.sql.tests.pandas.test_pandas_udf",
         "pyspark.sql.tests.pandas.test_pandas_udf_grouped_agg",
         "pyspark.sql.tests.pandas.test_pandas_udf_scalar",
@@ -1125,6 +1126,7 @@ pyspark_connect = Module(
         "pyspark.sql.tests.connect.streaming.test_parity_listener",
         "pyspark.sql.tests.connect.streaming.test_parity_foreach",
         "pyspark.sql.tests.connect.streaming.test_parity_foreach_batch",
+        
"pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark",
         "pyspark.sql.tests.connect.test_resources",
         "pyspark.sql.tests.connect.shell.test_progress",
         "pyspark.sql.tests.connect.test_df_debug",
diff --git 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
 
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
index e772c2139326..334031ec362f 100644
--- 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
+++ 
b/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
@@ -18,7 +18,6 @@ import unittest
 
 from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
     TransformWithStateInPandasTestsMixin,
-    TransformWithStateInPySparkTestsMixin,
 )
 from pyspark import SparkConf
 from pyspark.testing.connectutils import ReusedConnectTestCase
@@ -54,36 +53,6 @@ class TransformWithStateInPandasParityTests(
         pass
 
 
-class TransformWithStateInPySparkParityTests(
-    TransformWithStateInPySparkTestsMixin, ReusedConnectTestCase
-):
-    """
-    Spark connect parity tests for TransformWithStateInPySpark. Run every test 
case in
-     `TransformWithStateInPySparkTestsMixin` in spark connect mode.
-    """
-
-    @classmethod
-    def conf(cls):
-        # Due to multiple inheritance from the same level, we need to 
explicitly setting configs in
-        # both TransformWithStateInPySparkTestsMixin and ReusedConnectTestCase 
here
-        cfg = SparkConf(loadDefaults=False)
-        for base in cls.__bases__:
-            if hasattr(base, "conf"):
-                parent_cfg = base.conf()
-                for k, v in parent_cfg.getAll():
-                    cfg.set(k, v)
-
-        # Extra removing config for connect suites
-        if cfg._jconf is not None:
-            cfg._jconf.remove("spark.master")
-
-        return cfg
-
-    @unittest.skip("Flaky in spark connect on CI. Skip for now. See 
SPARK-51368 for details.")
-    def test_schema_evolution_scenarios(self):
-        pass
-
-
 if __name__ == "__main__":
     from 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state import 
*  # noqa: F401,E501
 
diff --git 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
 
b/python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
similarity index 65%
copy from 
python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
copy to 
python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
index e772c2139326..7209b94a5060 100644
--- 
a/python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py
+++ 
b/python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py
@@ -17,43 +17,12 @@
 import unittest
 
 from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
-    TransformWithStateInPandasTestsMixin,
     TransformWithStateInPySparkTestsMixin,
 )
 from pyspark import SparkConf
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
-class TransformWithStateInPandasParityTests(
-    TransformWithStateInPandasTestsMixin, ReusedConnectTestCase
-):
-    """
-    Spark connect parity tests for TransformWithStateInPandas. Run every test 
case in
-     `TransformWithStateInPandasTestsMixin` in spark connect mode.
-    """
-
-    @classmethod
-    def conf(cls):
-        # Due to multiple inheritance from the same level, we need to 
explicitly setting configs in
-        # both TransformWithStateInPandasTestsMixin and ReusedConnectTestCase 
here
-        cfg = SparkConf(loadDefaults=False)
-        for base in cls.__bases__:
-            if hasattr(base, "conf"):
-                parent_cfg = base.conf()
-                for k, v in parent_cfg.getAll():
-                    cfg.set(k, v)
-
-        # Extra removing config for connect suites
-        if cfg._jconf is not None:
-            cfg._jconf.remove("spark.master")
-
-        return cfg
-
-    @unittest.skip("Flaky in spark connect on CI. Skip for now. See 
SPARK-51368 for details.")
-    def test_schema_evolution_scenarios(self):
-        pass
-
-
 class TransformWithStateInPySparkParityTests(
     TransformWithStateInPySparkTestsMixin, ReusedConnectTestCase
 ):
@@ -85,7 +54,7 @@ class TransformWithStateInPySparkParityTests(
 
 
 if __name__ == "__main__":
-    from 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state import 
*  # noqa: F401,E501
+    from 
pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark 
import *  # noqa: F401,E501
 
     try:
         import xmlrunner
diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 576c0cf6e6e1..d5b1c737386a 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -2011,22 +2011,6 @@ class 
TransformWithStateInPandasTestsMixin(TransformWithStateTestsMixin):
         return cfg
 
 
-class 
TransformWithStateInPandasWithCheckpointV2TestsMixin(TransformWithStateInPandasTestsMixin):
-    @classmethod
-    def conf(cls):
-        cfg = super().conf()
-        cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
-        return cfg
-
-
-class 
TransformWithStateInPySparkWithCheckpointV2TestsMixin(TransformWithStateInPySparkTestsMixin):
-    @classmethod
-    def conf(cls):
-        cfg = super().conf()
-        cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
-        return cfg
-
-
 class TransformWithStateInPandasTests(TransformWithStateInPandasTestsMixin, 
ReusedSQLTestCase):
     pass
 
@@ -2035,18 +2019,6 @@ class 
TransformWithStateInPySparkTests(TransformWithStateInPySparkTestsMixin, Re
     pass
 
 
-class TransformWithStateInPandasWithCheckpointV2Tests(
-    TransformWithStateInPandasWithCheckpointV2TestsMixin, ReusedSQLTestCase
-):
-    pass
-
-
-class TransformWithStateInPySparkWithCheckpointV2Tests(
-    TransformWithStateInPySparkWithCheckpointV2TestsMixin, ReusedSQLTestCase
-):
-    pass
-
-
 if __name__ == "__main__":
     from pyspark.sql.tests.pandas.test_pandas_transform_with_state import *  # 
noqa: F401
 
diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
new file mode 100644
index 000000000000..d6609c44db62
--- /dev/null
+++ 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state_checkpoint_v2.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
+    TransformWithStateInPandasTestsMixin,
+    TransformWithStateInPySparkTestsMixin,
+)
+
+
+class 
TransformWithStateInPandasWithCheckpointV2TestsMixin(TransformWithStateInPandasTestsMixin):
+    @classmethod
+    def conf(cls):
+        cfg = super().conf()
+        cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
+        return cfg
+
+
+class 
TransformWithStateInPySparkWithCheckpointV2TestsMixin(TransformWithStateInPySparkTestsMixin):
+    @classmethod
+    def conf(cls):
+        cfg = super().conf()
+        cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
+        return cfg
+
+
+class TransformWithStateInPandasWithCheckpointV2Tests(
+    TransformWithStateInPandasWithCheckpointV2TestsMixin, ReusedSQLTestCase
+):
+    pass
+
+
+class TransformWithStateInPySparkWithCheckpointV2Tests(
+    TransformWithStateInPySparkWithCheckpointV2TestsMixin, ReusedSQLTestCase
+):
+    pass
+
+
+if __name__ == "__main__":
+    from 
pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2 import 
*  # noqa: F401,E501
+
+    try:
+        import xmlrunner
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to