This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 564514899ca7 Revert "[SPARK-51758][SS][FOLLOWUP][TESTS] Fix flaky test 
around watermark due to additional batch causing empty df"
564514899ca7 is described below

commit 564514899ca7f4479133ee46fda96201c6e24875
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu Apr 17 19:03:34 2025 +0900

    Revert "[SPARK-51758][SS][FOLLOWUP][TESTS] Fix flaky test around watermark 
due to additional batch causing empty df"
    
    revert https://github.com/apache/spark/pull/50615
    
    manually test
    
    master
    ```
    (spark_313) ➜  spark git:(master) python/run-tests -k --python-executables 
python3 --testnames 
'pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time'
    Running PySpark tests. Output is in 
/Users/ruifeng.zheng/Dev/spark/python/unit-tests.log
    Will test against the following Python executables: ['python3']
    Will test the following Python tests: 
['pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time']
    python3 python_implementation is CPython
    python3 version is: Python 3.13.3
    Starting test(python3): 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time
 (temp output: 
/Users/ruifeng.zheng/Dev/spark/python/target/75f38dd9-30c8-4122-b88f-047c33bff8ee/python3__pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state_TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time__1ec968gn.log)
    
    Running tests...
    ----------------------------------------------------------------------
    WARNING: Using incubator modules: jdk.incubator.vector
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: 
UserWarning: Failed to set 
spark.connect.execute.reattachable.senderMaxStreamDuration to Some(1s) due to 
[CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: 
"spark.connect.execute.reattachable.senderMaxStreamDuration".
    See also 
'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. 
SQLSTATE: 46110
      warnings.warn(warn)
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: 
UserWarning: Failed to set 
spark.connect.execute.reattachable.senderMaxStreamSize to Some(123) due to 
[CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: 
"spark.connect.execute.reattachable.senderMaxStreamSize".
    See also 
'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. 
SQLSTATE: 46110
      warnings.warn(warn)
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: 
UserWarning: Failed to set spark.connect.authenticate.token to Some(deadbeef) 
due to [CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: 
"spark.connect.authenticate.token".
    See also 
'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. 
SQLSTATE: 46110
      warnings.warn(warn)
      test_transform_with_state_with_wmark_and_non_event_time 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time)
 ... ERROR (4.823s)
    
    ======================================================================
    ERROR [4.823s]: test_transform_with_state_with_wmark_and_non_event_time 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/streaming/readwriter.py",
 line 587, in foreachBatch
        self._write_proto.foreach_batch.python_function.command = 
CloudPickleSerializer().dumps(
                                                                  
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
            func
            ^^^^
        )
        ^
      File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/serializers.py", line 
460, in dumps
        return cloudpickle.dumps(obj, pickle_protocol)
               ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", 
line 1537, in dumps
        cp.dump(obj)
        ~~~~~~~^^^^^
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", 
line 1303, in dump
        return super().dump(obj)
               ~~~~~~~~~~~~^^^^^
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", 
line 966, in _file_reduce
        raise pickle.PicklingError(
            "Cannot pickle files that are not opened for reading: %s" % obj.mode
        )
    _pickle.PicklingError: Cannot pickle files that are not opened for reading: 
w
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 624, in test_transform_with_state_with_wmark_and_non_event_time
        self._test_transform_with_state_in_pandas_event_time(
        ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
            MinEventTimeStatefulProcessor(), check_results, "None"
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        )
        ^
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 561, in _test_transform_with_state_in_pandas_event_time
        .foreachBatch(check_results)
         ~~~~~~~~~~~~^^^^^^^^^^^^^^^
      File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/streaming/readwriter.py",
 line 591, in foreachBatch
        raise PySparkPicklingError(
        ...<2 lines>...
        )
    pyspark.errors.exceptions.base.PySparkPicklingError: 
[STREAMING_CONNECT_SERIALIZATION_ERROR] Cannot serialize the function 
`foreachBatch`. If you accessed the Spark session, or a DataFrame defined 
outside of the function, or any object that contains a Spark session, please be 
aware that they are not allowed in Spark Connect. For `foreachBatch`, please 
access the Spark session using `df.sparkSession`, where `df` is the first 
parameter in your `foreachBatch` function. For `StreamingQuer [...]
    
    ----------------------------------------------------------------------
    Ran 1 test in 8.169s
    
    FAILED (errors=1)
    
    Generating XML reports...
    Generated XML report: 
target/test-reports/TEST-pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests-20250417173545.xml
    
    Had test failures in 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time
 with python3; see logs.
    ```
    
    after revert:
    ```
    (spark_313) ➜  spark git:(test_test) python/run-tests -k 
--python-executables python3 --testnames 
'pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time'
    Running PySpark tests. Output is in 
/Users/ruifeng.zheng/Dev/spark/python/unit-tests.log
    Will test against the following Python executables: ['python3']
    Will test the following Python tests: 
['pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time']
    python3 python_implementation is CPython
    python3 version is: Python 3.13.3
    Starting test(python3): 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time
 (temp output: 
/Users/ruifeng.zheng/Dev/spark/python/target/9c5f324b-d914-4a50-bdf6-20fcfa9474c7/python3__pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state_TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time__fkrm_xmh.log)
    Finished test(python3): 
pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state 
TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time
 (51s)
    Tests passed in 51 seconds
    
    ```
    
    Closes #50621 from zhengruifeng/test_test.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 25bf063d20e6..d5fcb7fdf91d 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -612,14 +612,11 @@ class TransformWithStateInPandasTestsMixin:
                 assert set(batch_df.sort("id").collect()) == {
                     Row(id="a", timestamp="4"),
                 }
-            elif batch_id == 2:
+            else:
                 # watermark for late event = 10 and min event = 2 with no 
filtering
                 assert set(batch_df.sort("id").collect()) == {
                     Row(id="a", timestamp="2"),
                 }
-            else:
-                for q in self.spark.streams.active:
-                    q.stop()
 
         self._test_transform_with_state_in_pandas_event_time(
             MinEventTimeStatefulProcessor(), check_results, "None"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to