This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25892a77ff9c [SPARK-52949][PYTHON] Avoid roundtrip between RecordBatch 
and Table in Arrow-optimized Python UDTF
25892a77ff9c is described below

commit 25892a77ff9c0f4d5c710b360c4538e7fefde9ab
Author: Takuya Ueshin <ues...@databricks.com>
AuthorDate: Fri Jul 25 11:11:15 2025 -0700

    [SPARK-52949][PYTHON] Avoid roundtrip between RecordBatch and Table in 
Arrow-optimized Python UDTF
    
    ### What changes were proposed in this pull request?
    
    Avoids roundtrip between `RecordBatch` and `Table` in Arrow-optimized 
Python UDTF.
    
    ### Why are the changes needed?
    
    In the Arrow-optimized Python UDTF code path, there are unnecessary 
roundtrip between `RecordBatch` and `Table`.
    We can defer converting to `RecordBatch` to after the result verification.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51659 from ueshin/issues/SPARK-52949/arrow_udtf.
    
    Authored-by: Takuya Ueshin <ues...@databricks.com>
    Signed-off-by: Takuya Ueshin <ues...@databricks.com>
---
 python/pyspark/worker.py | 28 +++++++++++++---------------
 1 file changed, 13 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index f6f787755014..342ebc14311f 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1638,7 +1638,7 @@ def read_udtf(pickleSer, infile, eval_type):
             import pandas as pd
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=use_large_var_types(runner_conf)
+                return_type, prefers_large_types=prefers_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -1757,12 +1757,12 @@ def read_udtf(pickleSer, infile, eval_type):
             import pyarrow as pa
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=use_large_var_types(runner_conf)
+                return_type, prefers_large_types=prefers_large_var_types
             )
             return_type_size = len(return_type)
 
             def verify_result(result):
-                if not isinstance(result, pa.RecordBatch):
+                if not isinstance(result, pa.Table):
                     raise PySparkTypeError(
                         errorClass="INVALID_ARROW_UDTF_RETURN_TYPE",
                         messageParameters={
@@ -1776,20 +1776,20 @@ def read_udtf(pickleSer, infile, eval_type):
                 # rows or columns. Note that we avoid using `df.empty` here 
because the
                 # result dataframe may contain an empty row. For example, when 
a UDTF is
                 # defined as follows: def eval(self): yield tuple().
-                if len(result) > 0 or len(result.columns) > 0:
-                    if len(result.columns) != return_type_size:
+                if result.num_rows > 0 or result.num_columns > 0:
+                    if result.num_columns != return_type_size:
                         raise PySparkRuntimeError(
                             errorClass="UDTF_RETURN_SCHEMA_MISMATCH",
                             messageParameters={
                                 "expected": str(return_type_size),
-                                "actual": str(len(result.columns)),
+                                "actual": str(result.num_columns),
                                 "func": f.__name__,
                             },
                         )
 
                 # Verify the type and the schema of the result.
                 verify_arrow_result(
-                    pa.Table.from_batches([result], 
schema=pa.schema(list(arrow_return_type))),
+                    result,
                     assign_cols_by_name=False,
                     expected_cols_and_types=[
                         (field.name, field.type) for field in arrow_return_type
@@ -1832,9 +1832,7 @@ def read_udtf(pickleSer, infile, eval_type):
             def convert_to_arrow(data: Iterable):
                 data = list(check_return_value(data))
                 if len(data) == 0:
-                    return [
-                        pa.RecordBatch.from_pylist(data, 
schema=pa.schema(list(arrow_return_type)))
-                    ]
+                    return pa.Table.from_pylist(data, 
schema=pa.schema(list(arrow_return_type)))
 
                 def raise_conversion_error(original_exception):
                     raise PySparkRuntimeError(
@@ -1849,7 +1847,7 @@ def read_udtf(pickleSer, infile, eval_type):
                 try:
                     return LocalDataToArrowConversion.convert(
                         data, return_type, prefers_large_var_types
-                    ).to_batches()
+                    )
                 except PySparkValueError as e:
                     if e.getErrorClass() == "AXIS_LENGTH_MISMATCH":
                         raise PySparkRuntimeError(
@@ -1871,8 +1869,8 @@ def read_udtf(pickleSer, infile, eval_type):
 
             def evaluate(*args: pa.ChunkedArray):
                 if len(args) == 0:
-                    for batch in convert_to_arrow(func()):
-                        yield verify_result(batch), arrow_return_type
+                    for batch in 
verify_result(convert_to_arrow(func())).to_batches():
+                        yield batch, arrow_return_type
 
                 else:
                     list_args = list(args)
@@ -1883,8 +1881,8 @@ def read_udtf(pickleSer, infile, eval_type):
                         t, schema=schema, return_as_tuples=True
                     )
                     for row in rows:
-                        for batch in convert_to_arrow(func(*row)):
-                            yield verify_result(batch), arrow_return_type
+                        for batch in 
verify_result(convert_to_arrow(func(*row))).to_batches():
+                            yield batch, arrow_return_type
 
             return evaluate
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to