This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a82b4158d448 [SPARK-52946][PYTHON] Fix Arrow-optimized Python UDTF to 
support large var types
a82b4158d448 is described below

commit a82b4158d448c0dadc4e81e76a32521ec0b9d436
Author: Takuya Ueshin <ues...@databricks.com>
AuthorDate: Thu Jul 24 16:02:26 2025 -0700

    [SPARK-52946][PYTHON] Fix Arrow-optimized Python UDTF to support large var 
types
    
    ### What changes were proposed in this pull request?
    
    Fixes Arrow-optimized Python UDTF to support large var types.
    
    ### Why are the changes needed?
    
    The new Arrow-optimized Python UDTF path won't work with large var types.
    
    ```py
    >>> udtf(returnType="a: string", useArrow=True)
    ... class TestUDTF:
    ...     def eval(self, a: int):
    ...         yield str(a)
    ...
    >>> spark.conf.set("spark.sql.execution.arrow.useLargeVarTypes", True)
    >>> TestUDTF(lit(1)).show()
    ...
    pyspark.errors.exceptions.base.PySparkRuntimeError: 
[RESULT_TYPE_MISMATCH_FOR_ARROW_UDF] Columns do not match in their data type: 
column 'a' (expected string, actual large_string).
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the config `spark.sql.execution.arrow.useLargeVarTypes=True` should 
work in the Arrow-optimized Python UDTF.
    
    ### How was this patch tested?
    
    Added the related tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51655 from ueshin/issues/SPARK-52946/use_large_var_types.
    
    Authored-by: Takuya Ueshin <ues...@databricks.com>
    Signed-off-by: Takuya Ueshin <ues...@databricks.com>
---
 python/pyspark/sql/tests/test_udtf.py | 14 ++++++++++++++
 python/pyspark/worker.py              |  2 +-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index f7fd8fe6c5b7..43ec95c2a076 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -2935,6 +2935,20 @@ class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
 
         assertDataFrameEqual(TestUDTF(lit(1)), [Row(a=1)])
 
+    def test_udtf_use_large_var_types(self):
+        for use_large_var_types in [True, False]:
+            with self.subTest(use_large_var_types=use_large_var_types):
+                with self.sql_conf(
+                    {"spark.sql.execution.arrow.useLargeVarTypes": 
use_large_var_types}
+                ):
+
+                    @udtf(returnType="a: string")
+                    class TestUDTF:
+                        def eval(self, a: int):
+                            yield str(a)
+
+                    assertDataFrameEqual(TestUDTF(lit(1)), [Row(a="1")])
+
     def test_numeric_output_type_casting(self):
         class TestUDTF:
             def eval(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3c869b3dba90..63fc8cc842b0 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1792,7 +1792,7 @@ def read_udtf(pickleSer, infile, eval_type):
                     pa.Table.from_batches([result], 
schema=pa.schema(list(arrow_return_type))),
                     assign_cols_by_name=False,
                     expected_cols_and_types=[
-                        (col.name, to_arrow_type(col.dataType)) for col in 
return_type.fields
+                        (field.name, field.type) for field in arrow_return_type
                     ],
                 )
                 return result


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to