This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d61a7b93ae02 [SPARK-53029][FOLLOWUP] Update PyArrow StructType usage 
to avoid .names attribute for compatibility with PyArrow
d61a7b93ae02 is described below

commit d61a7b93ae02d04f580ae2ce911484a83018dacb
Author: Allison Wang <[email protected]>
AuthorDate: Wed Sep 10 16:00:41 2025 +0900

    [SPARK-53029][FOLLOWUP] Update PyArrow StructType usage to avoid .names 
attribute for compatibility with PyArrow
    
    ### What changes were proposed in this pull request?
    
    This is a follow up for SPARK-53029. The `.names` attribute for PyArrow's 
StructType was added in PyArrow version 18.0.0, but the minimum required 
version for pyarrow now is 15.0.0.
    
    ### Why are the changes needed?
    
    To fix compatibility issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52289 from allisonwang-db/SPARK-53426-fix.
    
    Lead-authored-by: Allison Wang <[email protected]>
    Co-authored-by: Allison Wang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py          |  4 +--
 python/pyspark/sql/tests/arrow/test_arrow_udtf.py | 31 +++++++++++++++++++++++
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 58c962896df6..860335d1ff0f 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -267,7 +267,7 @@ class 
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
                 if batch.num_columns == 0:
                     coerced_batch = batch  # skip type coercion
                 else:
-                    expected_field_names = arrow_return_type.names
+                    expected_field_names = [field.name for field in 
arrow_return_type]
                     actual_field_names = batch.schema.names
 
                     if expected_field_names != actual_field_names:
@@ -283,7 +283,7 @@ class 
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
                         coerced_array = self._create_array(original_array, 
field.type)
                         coerced_arrays.append(coerced_array)
                     coerced_batch = pa.RecordBatch.from_arrays(
-                        coerced_arrays, names=arrow_return_type.names
+                        coerced_arrays, names=expected_field_names
                     )
                 yield coerced_batch, arrow_return_type
 
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udtf.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udtf.py
index 7c8f85a3790f..741673341dde 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udtf.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udtf.py
@@ -699,6 +699,37 @@ class ArrowUDTFTestsMixin:
         expected_df = self.spark.createDataFrame([(60, 180)], "computed_value 
int, multiplied int")
         assertDataFrameEqual(result_df, expected_df)
 
+    def test_arrow_udtf_with_named_arguments(self):
+        @arrow_udtf(returnType="x int, y int, sum int")
+        class NamedArgsUDTF:
+            def eval(self, x: "pa.Array", y: "pa.Array") -> 
Iterator["pa.Table"]:
+                assert isinstance(x, pa.Array), f"Expected pa.Array, got 
{type(x)}"
+                assert isinstance(y, pa.Array), f"Expected pa.Array, got 
{type(y)}"
+
+                x_val = x[0].as_py()
+                y_val = y[0].as_py()
+                result_table = pa.table(
+                    {
+                        "x": pa.array([x_val], type=pa.int32()),
+                        "y": pa.array([y_val], type=pa.int32()),
+                        "sum": pa.array([x_val + y_val], type=pa.int32()),
+                    }
+                )
+                yield result_table
+
+        # Test SQL registration and usage with named arguments
+        self.spark.udtf.register("named_args_udtf", NamedArgsUDTF)
+
+        # Test with named arguments in SQL
+        sql_result_df = self.spark.sql("SELECT * FROM named_args_udtf(y => 10, 
x => 5)")
+        expected_df = self.spark.createDataFrame([(5, 10, 15)], "x int, y int, 
sum int")
+        assertDataFrameEqual(sql_result_df, expected_df)
+
+        # Test with mixed positional and named arguments
+        sql_result_df2 = self.spark.sql("SELECT * FROM named_args_udtf(7, y => 
3)")
+        expected_df2 = self.spark.createDataFrame([(7, 3, 10)], "x int, y int, 
sum int")
+        assertDataFrameEqual(sql_result_df2, expected_df2)
+
 
 class ArrowUDTFTests(ArrowUDTFTestsMixin, ReusedSQLTestCase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to