This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 564514899ca7 Revert "[SPARK-51758][SS][FOLLOWUP][TESTS] Fix flaky test around watermark due to additional batch causing empty df" 564514899ca7 is described below commit 564514899ca7f4479133ee46fda96201c6e24875 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Thu Apr 17 19:03:34 2025 +0900 Revert "[SPARK-51758][SS][FOLLOWUP][TESTS] Fix flaky test around watermark due to additional batch causing empty df" revert https://github.com/apache/spark/pull/50615 manually test master ``` (spark_313) ➜ spark git:(master) python/run-tests -k --python-executables python3 --testnames 'pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time' Running PySpark tests. Output is in /Users/ruifeng.zheng/Dev/spark/python/unit-tests.log Will test against the following Python executables: ['python3'] Will test the following Python tests: ['pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time'] python3 python_implementation is CPython python3 version is: Python 3.13.3 Starting test(python3): pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time (temp output: /Users/ruifeng.zheng/Dev/spark/python/target/75f38dd9-30c8-4122-b88f-047c33bff8ee/python3__pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state_TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time__1ec968gn.log) Running tests... ---------------------------------------------------------------------- WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.execute.reattachable.senderMaxStreamDuration to Some(1s) due to [CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: "spark.connect.execute.reattachable.senderMaxStreamDuration". See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. SQLSTATE: 46110 warnings.warn(warn) /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.execute.reattachable.senderMaxStreamSize to Some(123) due to [CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: "spark.connect.execute.reattachable.senderMaxStreamSize". See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. SQLSTATE: 46110 warnings.warn(warn) /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.connect.authenticate.token to Some(deadbeef) due to [CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: "spark.connect.authenticate.token". See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. SQLSTATE: 46110 warnings.warn(warn) test_transform_with_state_with_wmark_and_non_event_time (pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time) ... ERROR (4.823s) ====================================================================== ERROR [4.823s]: test_transform_with_state_with_wmark_and_non_event_time (pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time) ---------------------------------------------------------------------- Traceback (most recent call last): File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/streaming/readwriter.py", line 587, in foreachBatch self._write_proto.foreach_batch.python_function.command = CloudPickleSerializer().dumps( ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^ func ^^^^ ) ^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/serializers.py", line 460, in dumps return cloudpickle.dumps(obj, pickle_protocol) ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", line 1537, in dumps cp.dump(obj) ~~~~~~~^^^^^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", line 1303, in dump return super().dump(obj) ~~~~~~~~~~~~^^^^^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/cloudpickle/cloudpickle.py", line 966, in _file_reduce raise pickle.PicklingError( "Cannot pickle files that are not opened for reading: %s" % obj.mode ) _pickle.PicklingError: Cannot pickle files that are not opened for reading: w During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 624, in test_transform_with_state_with_wmark_and_non_event_time self._test_transform_with_state_in_pandas_event_time( ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^ MinEventTimeStatefulProcessor(), check_results, "None" ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 561, in _test_transform_with_state_in_pandas_event_time .foreachBatch(check_results) ~~~~~~~~~~~~^^^^^^^^^^^^^^^ File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/streaming/readwriter.py", line 591, in foreachBatch raise PySparkPicklingError( ...<2 lines>... ) pyspark.errors.exceptions.base.PySparkPicklingError: [STREAMING_CONNECT_SERIALIZATION_ERROR] Cannot serialize the function `foreachBatch`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQuer [...] ---------------------------------------------------------------------- Ran 1 test in 8.169s FAILED (errors=1) Generating XML reports... Generated XML report: target/test-reports/TEST-pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state.TransformWithStateInPandasParityTests-20250417173545.xml Had test failures in pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time with python3; see logs. ``` after revert: ``` (spark_313) ➜ spark git:(test_test) python/run-tests -k --python-executables python3 --testnames 'pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time' Running PySpark tests. Output is in /Users/ruifeng.zheng/Dev/spark/python/unit-tests.log Will test against the following Python executables: ['python3'] Will test the following Python tests: ['pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time'] python3 python_implementation is CPython python3 version is: Python 3.13.3 Starting test(python3): pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time (temp output: /Users/ruifeng.zheng/Dev/spark/python/target/9c5f324b-d914-4a50-bdf6-20fcfa9474c7/python3__pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state_TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time__fkrm_xmh.log) Finished test(python3): pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state TransformWithStateInPandasParityTests.test_transform_with_state_with_wmark_and_non_event_time (51s) Tests passed in 51 seconds ``` Closes #50621 from zhengruifeng/test_test. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py index 25bf063d20e6..d5fcb7fdf91d 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py @@ -612,14 +612,11 @@ class TransformWithStateInPandasTestsMixin: assert set(batch_df.sort("id").collect()) == { Row(id="a", timestamp="4"), } - elif batch_id == 2: + else: # watermark for late event = 10 and min event = 2 with no filtering assert set(batch_df.sort("id").collect()) == { Row(id="a", timestamp="2"), } - else: - for q in self.spark.streams.active: - q.stop() self._test_transform_with_state_in_pandas_event_time( MinEventTimeStatefulProcessor(), check_results, "None" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org