This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f7b20da56156 [SPARK-54842][PYTHON][TESTS] Fix
`test_arrow_udf_chained_iii` in Python-Only MacOS26
f7b20da56156 is described below
commit f7b20da56156c9944b92e7d10f5cb2ad3a31878d
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 26 08:17:06 2025 +0800
[SPARK-54842][PYTHON][TESTS] Fix `test_arrow_udf_chained_iii` in
Python-Only MacOS26
### What changes were proposed in this pull request?
attempt to fix
https://github.com/apache/spark/actions/runs/20495264978/job/58904792835,
```
Caused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3502, in process
serializer.dump_stream(out_iter, outfile)
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 781, in dump_stream
return ArrowStreamSerializer.dump_stream(self, wrap_and_init_stream(),
stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 120, in dump_stream
for batch in iterator:
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 765, in wrap_and_init_stream
for packed in iterator:
File
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
2954, in func
for result_batch, result_type in result_iter:
File
"/Users/runner/work/spark/spark/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py",
line 930, in <lambda>
lambda it: map(lambda x: pa.compute.subtract(x, 1), it),
^^^^^^^^^^
AttributeError: module 'pyarrow' has no attribute 'compute'
```
This test passed before on macos26;
The parity test on spark connect pass;
### Why are the changes needed?
I suspect there is a cloudpickle pitfall when dealing with complicated
nested lambdas, I remember I resolved a similar issue by changing the import.
### Does this PR introduce _any_ user-facing change?
no, test-only
### How was this patch tested?
cannot reproduce this issue locally, will monitor the CI
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53607 from zhengruifeng/fix_test_arrow_udf_chained_iii.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
index a0d805a3ab27..710e8322de21 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
@@ -916,18 +916,19 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_chained_iii(self):
import pyarrow as pa
+ import pyarrow.compute as pc
- scalar_f = arrow_udf(lambda x: pa.compute.add(x, 1), LongType())
- scalar_g = arrow_udf(lambda x: pa.compute.subtract(x, 1), LongType())
- scalar_m = arrow_udf(lambda x, y: pa.compute.multiply(x, y),
LongType())
+ scalar_f = arrow_udf(lambda x: pc.add(x, 1), LongType())
+ scalar_g = arrow_udf(lambda x: pc.subtract(x, 1), LongType())
+ scalar_m = arrow_udf(lambda x, y: pc.multiply(x, y), LongType())
iter_f = arrow_udf(
- lambda it: map(lambda x: pa.compute.add(x, 1), it),
+ lambda it: map(lambda x: pc.add(x, 1), it),
LongType(),
ArrowUDFType.SCALAR_ITER,
)
iter_g = arrow_udf(
- lambda it: map(lambda x: pa.compute.subtract(x, 1), it),
+ lambda it: map(lambda x: pc.subtract(x, 1), it),
LongType(),
ArrowUDFType.SCALAR_ITER,
)
@@ -935,7 +936,7 @@ class ScalarArrowUDFTestsMixin:
@arrow_udf(LongType())
def iter_m(it: Iterator[Tuple[pa.Array, pa.Array]]) ->
Iterator[pa.Array]:
for a, b in it:
- yield pa.compute.multiply(a, b)
+ yield pc.multiply(a, b)
df = self.spark.range(10)
expected = df.select(((F.col("id") + 1) * (F.col("id") -
1)).alias("res")).collect()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]