This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 537f8136c571 [SPARK-52861][PYTHON] Skip Row object creation in Arrow-optimized UDTF execution 537f8136c571 is described below commit 537f8136c5717b60c52030d6c5204f194818a397 Author: Takuya Ueshin <ues...@databricks.com> AuthorDate: Fri Jul 18 13:23:49 2025 -0700 [SPARK-52861][PYTHON] Skip Row object creation in Arrow-optimized UDTF execution ### What changes were proposed in this pull request? Skips `Row` object creation in Arrow-optimized UDTF execution. ### Why are the changes needed? The `Row` object creation is used in Arrow-optimized UDTF execution, although it's expensive, but not necessary. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing tests, and manual benchmarks. ```py def profile(f, *args, _n=10, **kwargs): import cProfile import pstats import gc st = None for _ in range(5): f(*args, **kwargs) for _ in range(_n): gc.collect() with cProfile.Profile() as pr: ret = f(*args, **kwargs) if st is None: st = pstats.Stats(pr) else: st.add(pstats.Stats(pr)) st.sort_stats("time", "cumulative").print_stats() return ret from pyspark.sql.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion from pyspark.sql.types import * data = [ (i if i % 1000 else None, str(i)) for i in range(1000000) ] schema = ( StructType() .add("i", IntegerType(), nullable=True) .add("s", StringType(), nullable=True) ) def to_arrow(): return LocalDataToArrowConversion.convert(data, schema, use_large_var_types=False) def from_arrow(tbl, return_as_tuples): return ArrowTableToRowsConversion.convert(tbl, schema, return_as_tuples=return_as_tuples) tbl = to_arrow() profile(from_arrow, tbl, return_as_tuples=False) profile(from_arrow, tbl, return_as_tuples=True) ``` - before (`return_as_tuples=False`) ``` 60655810 function calls in 14.112 seconds ``` - after (`return_as_tuples=True`) ``` 20328060 function calls in 5.613 seconds ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51546 from ueshin/issues/SPARK-52861/skip_row_creation. Authored-by: Takuya Ueshin <ues...@databricks.com> Signed-off-by: Takuya Ueshin <ues...@databricks.com> --- python/pyspark/sql/conversion.py | 32 +++++++++++++++++++++++++++----- python/pyspark/worker.py | 5 +++-- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py index bf36522a423a..6217ffdf9862 100644 --- a/python/pyspark/sql/conversion.py +++ b/python/pyspark/sql/conversion.py @@ -18,7 +18,7 @@ import array import datetime import decimal -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, overload +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union, overload from pyspark.errors import PySparkValueError from pyspark.sql.pandas.types import _dedup_names, _deduplicate_field_names, to_arrow_schema @@ -446,7 +446,7 @@ class LocalDataToArrowConversion: return pa.Table.from_arrays(pylist, schema=pa_schema) else: - return pa.table({"_": [None] * len(rows)}).drop("_") + return pa.Table.from_struct_array(pa.array([{}] * len(rows))) class ArrowTableToRowsConversion: @@ -687,8 +687,24 @@ class ArrowTableToRowsConversion: else: return lambda value: value + @overload + @staticmethod + def convert( # type: ignore[overload-overlap] + table: "pa.Table", schema: StructType + ) -> List[Row]: + pass + + @overload @staticmethod - def convert(table: "pa.Table", schema: StructType) -> List[Row]: + def convert( + table: "pa.Table", schema: StructType, *, return_as_tuples: bool = True + ) -> List[tuple]: + pass + + @staticmethod # type: ignore[misc] + def convert( + table: "pa.Table", schema: StructType, *, return_as_tuples: bool = False + ) -> List[Union[Row, tuple]]: require_minimum_pyarrow_version() import pyarrow as pa @@ -709,8 +725,14 @@ class ArrowTableToRowsConversion: for column, conv in zip(table.columns, field_converters) ] - rows = [_create_row(fields, tuple(cols)) for cols in zip(*columnar_data)] + if return_as_tuples: + rows = [tuple(cols) for cols in zip(*columnar_data)] + else: + rows = [_create_row(fields, tuple(cols)) for cols in zip(*columnar_data)] assert len(rows) == table.num_rows, f"{len(rows)}, {table.num_rows}" return rows else: - return [_create_row(fields, tuple())] * table.num_rows + if return_as_tuples: + return [tuple()] * table.num_rows + else: + return [_create_row(fields, tuple())] * table.num_rows diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5febf16fa3aa..d839cf00eb0a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1694,9 +1694,10 @@ def read_udtf(pickleSer, infile, eval_type): names = [f"_{n}" for n in range(len(list_args))] t = pa.Table.from_arrays(list_args, names=names) schema = from_arrow_schema(t.schema, prefers_large_var_types) - rows = ArrowTableToRowsConversion.convert(t, schema=schema) + rows = ArrowTableToRowsConversion.convert( + t, schema=schema, return_as_tuples=True + ) for row in rows: - row = tuple(row) # type: ignore[assignment] for batch in convert_to_arrow(func(*row)): yield verify_result(batch), arrow_return_type --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org