This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e69821af92 [SPARK-53867][PYTHON][FOLLOW-UP] Fix `pa.concat_batches` 
for old pyarrow versions
e3e69821af92 is described below

commit e3e69821af9282a3c61702cda5b83f936b2b4eef
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Oct 15 11:55:31 2025 +0800

    [SPARK-53867][PYTHON][FOLLOW-UP] Fix `pa.concat_batches` for old pyarrow 
versions
    
    ### What changes were proposed in this pull request?
    Fix `pa.concat_batches` for old pyarrow versions
    
    ### Why are the changes needed?
    
    `pa.concat_batches` is only available since 19+
    
    to restore schedule workflows 
https://github.com/apache/spark/actions/runs/18491442406/job/52685995258
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    manually test against pyarrow 15
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52617 from zhengruifeng/fix_concat_batch.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 89b30668424a..55fbfe4b34bb 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -1175,7 +1175,14 @@ class 
ArrowStreamAggArrowUDFSerializer(ArrowStreamArrowUDFSerializer):
             dataframes_in_group = read_int(stream)
 
             if dataframes_in_group == 1:
-                yield 
pa.concat_batches(ArrowStreamSerializer.load_stream(self, stream))
+                batches = ArrowStreamSerializer.load_stream(self, stream)
+                if hasattr(pa, "concat_batches"):
+                    yield pa.concat_batches(batches)
+                else:
+                    # pyarrow.concat_batches not supported in old versions
+                    yield pa.RecordBatch.from_struct_array(
+                        pa.concat_arrays([b.to_struct_array() for b in 
batches])
+                    )
 
             elif dataframes_in_group != 0:
                 raise PySparkValueError(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to