This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 25892a77ff9c [SPARK-52949][PYTHON] Avoid roundtrip between RecordBatch and Table in Arrow-optimized Python UDTF 25892a77ff9c is described below commit 25892a77ff9c0f4d5c710b360c4538e7fefde9ab Author: Takuya Ueshin <ues...@databricks.com> AuthorDate: Fri Jul 25 11:11:15 2025 -0700 [SPARK-52949][PYTHON] Avoid roundtrip between RecordBatch and Table in Arrow-optimized Python UDTF ### What changes were proposed in this pull request? Avoids roundtrip between `RecordBatch` and `Table` in Arrow-optimized Python UDTF. ### Why are the changes needed? In the Arrow-optimized Python UDTF code path, there are unnecessary roundtrip between `RecordBatch` and `Table`. We can defer converting to `RecordBatch` to after the result verification. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51659 from ueshin/issues/SPARK-52949/arrow_udtf. Authored-by: Takuya Ueshin <ues...@databricks.com> Signed-off-by: Takuya Ueshin <ues...@databricks.com> --- python/pyspark/worker.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index f6f787755014..342ebc14311f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1638,7 +1638,7 @@ def read_udtf(pickleSer, infile, eval_type): import pandas as pd arrow_return_type = to_arrow_type( - return_type, prefers_large_types=use_large_var_types(runner_conf) + return_type, prefers_large_types=prefers_large_var_types ) return_type_size = len(return_type) @@ -1757,12 +1757,12 @@ def read_udtf(pickleSer, infile, eval_type): import pyarrow as pa arrow_return_type = to_arrow_type( - return_type, prefers_large_types=use_large_var_types(runner_conf) + return_type, prefers_large_types=prefers_large_var_types ) return_type_size = len(return_type) def verify_result(result): - if not isinstance(result, pa.RecordBatch): + if not isinstance(result, pa.Table): raise PySparkTypeError( errorClass="INVALID_ARROW_UDTF_RETURN_TYPE", messageParameters={ @@ -1776,20 +1776,20 @@ def read_udtf(pickleSer, infile, eval_type): # rows or columns. Note that we avoid using `df.empty` here because the # result dataframe may contain an empty row. For example, when a UDTF is # defined as follows: def eval(self): yield tuple(). - if len(result) > 0 or len(result.columns) > 0: - if len(result.columns) != return_type_size: + if result.num_rows > 0 or result.num_columns > 0: + if result.num_columns != return_type_size: raise PySparkRuntimeError( errorClass="UDTF_RETURN_SCHEMA_MISMATCH", messageParameters={ "expected": str(return_type_size), - "actual": str(len(result.columns)), + "actual": str(result.num_columns), "func": f.__name__, }, ) # Verify the type and the schema of the result. verify_arrow_result( - pa.Table.from_batches([result], schema=pa.schema(list(arrow_return_type))), + result, assign_cols_by_name=False, expected_cols_and_types=[ (field.name, field.type) for field in arrow_return_type @@ -1832,9 +1832,7 @@ def read_udtf(pickleSer, infile, eval_type): def convert_to_arrow(data: Iterable): data = list(check_return_value(data)) if len(data) == 0: - return [ - pa.RecordBatch.from_pylist(data, schema=pa.schema(list(arrow_return_type))) - ] + return pa.Table.from_pylist(data, schema=pa.schema(list(arrow_return_type))) def raise_conversion_error(original_exception): raise PySparkRuntimeError( @@ -1849,7 +1847,7 @@ def read_udtf(pickleSer, infile, eval_type): try: return LocalDataToArrowConversion.convert( data, return_type, prefers_large_var_types - ).to_batches() + ) except PySparkValueError as e: if e.getErrorClass() == "AXIS_LENGTH_MISMATCH": raise PySparkRuntimeError( @@ -1871,8 +1869,8 @@ def read_udtf(pickleSer, infile, eval_type): def evaluate(*args: pa.ChunkedArray): if len(args) == 0: - for batch in convert_to_arrow(func()): - yield verify_result(batch), arrow_return_type + for batch in verify_result(convert_to_arrow(func())).to_batches(): + yield batch, arrow_return_type else: list_args = list(args) @@ -1883,8 +1881,8 @@ def read_udtf(pickleSer, infile, eval_type): t, schema=schema, return_as_tuples=True ) for row in rows: - for batch in convert_to_arrow(func(*row)): - yield verify_result(batch), arrow_return_type + for batch in verify_result(convert_to_arrow(func(*row))).to_batches(): + yield batch, arrow_return_type return evaluate --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org