This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 984d578c162d [SPARK-53729][PYTHON][CONNECT] Fix serialization of 
`pyspark.sql.connect.window.WindowSpec`
984d578c162d is described below

commit 984d578c162de2e8ebcfc5eee15f1f2822a9f723
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Sep 26 17:24:35 2025 +0800

    [SPARK-53729][PYTHON][CONNECT] Fix serialization of 
`pyspark.sql.connect.window.WindowSpec`
    
    ### What changes were proposed in this pull request?
    Fix serialization of `pyspark.sql.connect.window.WindowSpec`
    
    ### Why are the changes needed?
    this query fails
    ```
    from pyspark.sql import Window, WindowSpec
    from pyspark.sql.functions import *
    
    df = ( spark.readStream.format("rate-micro-batch")
            .option("rowsPerBatch", 1000)
            .option("numPartitions", 2)
            .load()
            .withColumn("some_string",lit("apples")) )
    
    window_spec = Window.partitionBy("some_string").orderBy("value")
    
    def get_calculate_row_number_in_batch(window_spec):
        def calculate_row_number_in_batch(df, batch_id):
            df.withColumn("row_number", row_number().over(window_spec)).show()
        return calculate_row_number_in_batch
    
    (df.writeStream
     .queryName('streaming_query')
     .foreachBatch(get_calculate_row_number_in_batch(window_spec))
     .trigger(processingTime='30 seconds')
     .start()
     )
    ```
    
    due to
    ```
    TypeError: WindowSpec.__new__() missing 3 required positional arguments: 
'partitionSpec', 'orderSpec', and 'frame'
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    bug-fix
    
    ### How was this patch tested?
    added test, and also manually checked above-mentioned query
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52464 from zhengruifeng/fix_win_spec.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/window.py                   |  5 ++++-
 python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/window.py 
b/python/pyspark/sql/connect/window.py
index bf6d60df6350..952258e8db48 100644
--- a/python/pyspark/sql/connect/window.py
+++ b/python/pyspark/sql/connect/window.py
@@ -18,7 +18,7 @@ from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
 
-from typing import TYPE_CHECKING, Union, Sequence, List, Optional, Tuple, 
cast, Iterable
+from typing import TYPE_CHECKING, Any, Union, Sequence, List, Optional, Tuple, 
cast, Iterable
 
 from pyspark.sql.column import Column
 from pyspark.sql.window import (
@@ -69,6 +69,9 @@ class WindowSpec(ParentWindowSpec):
         self.__init__(partitionSpec, orderSpec, frame)  # type: ignore[misc]
         return self
 
+    def __getnewargs__(self) -> Tuple[Any, ...]:
+        return (self._partitionSpec, self._orderSpec, self._frame)
+
     def __init__(
         self,
         partitionSpec: Sequence[Expression],
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 473bd328d5ad..c5b16f0da49e 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -135,6 +135,16 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         cdf2 = loads(data)
         self.assertEqual(cdf.collect(), cdf2.collect())
 
+    def test_window_spec_serialization(self):
+        from pyspark.sql.connect.window import Window
+        from pyspark.serializers import CPickleSerializer
+
+        pickle_ser = CPickleSerializer()
+        w = Window.partitionBy("some_string").orderBy("value")
+        b = pickle_ser.dumps(w)
+        w2 = pickle_ser.loads(b)
+        self.assertEqual(str(w), str(w2))
+
     def test_df_getattr_behavior(self):
         cdf = self.connect.range(10)
         sdf = self.spark.range(10)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to