This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 984d578c162d [SPARK-53729][PYTHON][CONNECT] Fix serialization of
`pyspark.sql.connect.window.WindowSpec`
984d578c162d is described below
commit 984d578c162de2e8ebcfc5eee15f1f2822a9f723
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Sep 26 17:24:35 2025 +0800
[SPARK-53729][PYTHON][CONNECT] Fix serialization of
`pyspark.sql.connect.window.WindowSpec`
### What changes were proposed in this pull request?
Fix serialization of `pyspark.sql.connect.window.WindowSpec`
### Why are the changes needed?
this query fails
```
from pyspark.sql import Window, WindowSpec
from pyspark.sql.functions import *
df = ( spark.readStream.format("rate-micro-batch")
.option("rowsPerBatch", 1000)
.option("numPartitions", 2)
.load()
.withColumn("some_string",lit("apples")) )
window_spec = Window.partitionBy("some_string").orderBy("value")
def get_calculate_row_number_in_batch(window_spec):
def calculate_row_number_in_batch(df, batch_id):
df.withColumn("row_number", row_number().over(window_spec)).show()
return calculate_row_number_in_batch
(df.writeStream
.queryName('streaming_query')
.foreachBatch(get_calculate_row_number_in_batch(window_spec))
.trigger(processingTime='30 seconds')
.start()
)
```
due to
```
TypeError: WindowSpec.__new__() missing 3 required positional arguments:
'partitionSpec', 'orderSpec', and 'frame'
```
### Does this PR introduce _any_ user-facing change?
bug-fix
### How was this patch tested?
added test, and also manually checked above-mentioned query
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52464 from zhengruifeng/fix_win_spec.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/window.py | 5 ++++-
python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/connect/window.py
b/python/pyspark/sql/connect/window.py
index bf6d60df6350..952258e8db48 100644
--- a/python/pyspark/sql/connect/window.py
+++ b/python/pyspark/sql/connect/window.py
@@ -18,7 +18,7 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
-from typing import TYPE_CHECKING, Union, Sequence, List, Optional, Tuple,
cast, Iterable
+from typing import TYPE_CHECKING, Any, Union, Sequence, List, Optional, Tuple,
cast, Iterable
from pyspark.sql.column import Column
from pyspark.sql.window import (
@@ -69,6 +69,9 @@ class WindowSpec(ParentWindowSpec):
self.__init__(partitionSpec, orderSpec, frame) # type: ignore[misc]
return self
+ def __getnewargs__(self) -> Tuple[Any, ...]:
+ return (self._partitionSpec, self._orderSpec, self._frame)
+
def __init__(
self,
partitionSpec: Sequence[Expression],
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 473bd328d5ad..c5b16f0da49e 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -135,6 +135,16 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
cdf2 = loads(data)
self.assertEqual(cdf.collect(), cdf2.collect())
+ def test_window_spec_serialization(self):
+ from pyspark.sql.connect.window import Window
+ from pyspark.serializers import CPickleSerializer
+
+ pickle_ser = CPickleSerializer()
+ w = Window.partitionBy("some_string").orderBy("value")
+ b = pickle_ser.dumps(w)
+ w2 = pickle_ser.loads(b)
+ self.assertEqual(str(w), str(w2))
+
def test_df_getattr_behavior(self):
cdf = self.connect.range(10)
sdf = self.spark.range(10)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]