This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5fba0b476ade [SPARK-51758][SS] Fix test case related to extra batch 
causing empty df due to watermark
5fba0b476ade is described below

commit 5fba0b476ade94c8fa50b8f367b601dca70c8b3b
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri Apr 18 06:26:30 2025 +0900

    [SPARK-51758][SS] Fix test case related to extra batch causing empty df due 
to watermark
    
    ### What changes were proposed in this pull request?
     Fix test case related to extra batch causing empty df due to watermark
    
    ### Why are the changes needed?
    Without this change, the test can be flaky due to the extra batch
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    ```
    Will test the following Python tests: 
['pyspark.sql.tests.pandas.test_pandas_transform_with_state 
TransformWithStateInPandasTests.test_transform_with_state_with_wmark_and_non_event_time']
    python3.9 python_implementation is CPython
    python3.9 version is: Python 3.9.20
    Starting test(python3.9): 
pyspark.sql.tests.pandas.test_pandas_transform_with_state 
TransformWithStateInPandasTests.test_transform_with_state_with_wmark_and_non_event_time
 (temp output: 
/Users/anish.shrigondekar/spark/spark/python/target/256cf6e3-872e-4590-b7d9-26677e48c48f/python3.9__pyspark.sql.tests.pandas.test_pandas_transform_with_state_TransformWithStateInPandasTests.test_transform_with_state_with_wmark_and_non_event_time__co1fbfji.log)
    Finished test(python3.9): 
pyspark.sql.tests.pandas.test_pandas_transform_with_state 
TransformWithStateInPandasTests.test_transform_with_state_with_wmark_and_non_event_time
 (51s)
    Tests passed in 51 seconds
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50626 from anishshri-db/task/SPARK-51758-test-fix.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 335063fc8f888654f6246f667973f4376bff9241)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py 
b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
index 44ea90ab6659..d3ea5e534443 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
@@ -641,11 +641,14 @@ class TransformWithStateInPandasTestsMixin:
                 assert set(batch_df.sort("id").collect()) == {
                     Row(id="a", timestamp="4"),
                 }
-            else:
+            elif batch_id == 2:
                 # watermark for late event = 10 and min event = 2 with no 
filtering
                 assert set(batch_df.sort("id").collect()) == {
                     Row(id="a", timestamp="2"),
                 }
+            else:
+                for q in batch_df.sparkSession.streams.active:
+                    q.stop()
 
         self._test_transform_with_state_in_pandas_event_time(
             MinEventTimeStatefulProcessor(), check_results, "None"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to