This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b20da56156 [SPARK-54842][PYTHON][TESTS] Fix 
`test_arrow_udf_chained_iii` in Python-Only MacOS26
f7b20da56156 is described below

commit f7b20da56156c9944b92e7d10f5cb2ad3a31878d
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Dec 26 08:17:06 2025 +0800

    [SPARK-54842][PYTHON][TESTS] Fix `test_arrow_udf_chained_iii` in 
Python-Only MacOS26
    
    ### What changes were proposed in this pull request?
    attempt to fix 
https://github.com/apache/spark/actions/runs/20495264978/job/58904792835,
    
    ```
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
recent call last):
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
        process()
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3502, in process
        serializer.dump_stream(out_iter, outfile)
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 781, in dump_stream
        return ArrowStreamSerializer.dump_stream(self, wrap_and_init_stream(), 
stream)
               
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 120, in dump_stream
        for batch in iterator:
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 765, in wrap_and_init_stream
        for packed in iterator:
      File 
"/Users/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
2954, in func
        for result_batch, result_type in result_iter:
      File 
"/Users/runner/work/spark/spark/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py",
 line 930, in <lambda>
        lambda it: map(lambda x: pa.compute.subtract(x, 1), it),
                                 ^^^^^^^^^^
    AttributeError: module 'pyarrow' has no attribute 'compute'
    ```
    
    This test passed before on macos26;
    The parity test on spark connect pass;
    
    ### Why are the changes needed?
    
    I suspect there is a cloudpickle pitfall when dealing with complicated 
nested lambdas, I remember I resolved a similar issue by changing the import.
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    cannot reproduce this issue locally, will monitor the CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53607 from zhengruifeng/fix_test_arrow_udf_chained_iii.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
index a0d805a3ab27..710e8322de21 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
@@ -916,18 +916,19 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_chained_iii(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
-        scalar_f = arrow_udf(lambda x: pa.compute.add(x, 1), LongType())
-        scalar_g = arrow_udf(lambda x: pa.compute.subtract(x, 1), LongType())
-        scalar_m = arrow_udf(lambda x, y: pa.compute.multiply(x, y), 
LongType())
+        scalar_f = arrow_udf(lambda x: pc.add(x, 1), LongType())
+        scalar_g = arrow_udf(lambda x: pc.subtract(x, 1), LongType())
+        scalar_m = arrow_udf(lambda x, y: pc.multiply(x, y), LongType())
 
         iter_f = arrow_udf(
-            lambda it: map(lambda x: pa.compute.add(x, 1), it),
+            lambda it: map(lambda x: pc.add(x, 1), it),
             LongType(),
             ArrowUDFType.SCALAR_ITER,
         )
         iter_g = arrow_udf(
-            lambda it: map(lambda x: pa.compute.subtract(x, 1), it),
+            lambda it: map(lambda x: pc.subtract(x, 1), it),
             LongType(),
             ArrowUDFType.SCALAR_ITER,
         )
@@ -935,7 +936,7 @@ class ScalarArrowUDFTestsMixin:
         @arrow_udf(LongType())
         def iter_m(it: Iterator[Tuple[pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
             for a, b in it:
-                yield pa.compute.multiply(a, b)
+                yield pc.multiply(a, b)
 
         df = self.spark.range(10)
         expected = df.select(((F.col("id") + 1) * (F.col("id") - 
1)).alias("res")).collect()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to