This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 537f8136c571 [SPARK-52861][PYTHON] Skip Row object creation in 
Arrow-optimized UDTF execution
537f8136c571 is described below

commit 537f8136c5717b60c52030d6c5204f194818a397
Author: Takuya Ueshin <ues...@databricks.com>
AuthorDate: Fri Jul 18 13:23:49 2025 -0700

    [SPARK-52861][PYTHON] Skip Row object creation in Arrow-optimized UDTF 
execution
    
    ### What changes were proposed in this pull request?
    
    Skips `Row` object creation in Arrow-optimized UDTF execution.
    
    ### Why are the changes needed?
    
    The `Row` object creation is used in Arrow-optimized UDTF execution, 
although it's expensive, but not necessary.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The existing tests, and manual benchmarks.
    
    ```py
    def profile(f, *args, _n=10, **kwargs):
        import cProfile
        import pstats
        import gc
        st = None
        for _ in range(5):
            f(*args, **kwargs)
        for _ in range(_n):
            gc.collect()
            with cProfile.Profile() as pr:
                ret = f(*args, **kwargs)
            if st is None:
                st = pstats.Stats(pr)
            else:
                st.add(pstats.Stats(pr))
        st.sort_stats("time", "cumulative").print_stats()
        return ret
    
    from pyspark.sql.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
    from pyspark.sql.types import *
    
    data = [
        (i if i % 1000 else None, str(i))
        for i in range(1000000)
    ]
    schema = (
        StructType()
        .add("i", IntegerType(), nullable=True)
        .add("s", StringType(), nullable=True)
    )
    
    def to_arrow():
        return LocalDataToArrowConversion.convert(data, schema, 
use_large_var_types=False)
    
    def from_arrow(tbl, return_as_tuples):
        return ArrowTableToRowsConversion.convert(tbl, schema, 
return_as_tuples=return_as_tuples)
    
    tbl = to_arrow()
    
    profile(from_arrow, tbl, return_as_tuples=False)
    profile(from_arrow, tbl, return_as_tuples=True)
    ```
    
    - before (`return_as_tuples=False`)
    
    ```
    60655810 function calls in 14.112 seconds
    ```
    
    - after (`return_as_tuples=True`)
    
    ```
    20328060 function calls in 5.613 seconds
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51546 from ueshin/issues/SPARK-52861/skip_row_creation.
    
    Authored-by: Takuya Ueshin <ues...@databricks.com>
    Signed-off-by: Takuya Ueshin <ues...@databricks.com>
---
 python/pyspark/sql/conversion.py | 32 +++++++++++++++++++++++++++-----
 python/pyspark/worker.py         |  5 +++--
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index bf36522a423a..6217ffdf9862 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -18,7 +18,7 @@
 import array
 import datetime
 import decimal
-from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, 
overload
+from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, 
Union, overload
 
 from pyspark.errors import PySparkValueError
 from pyspark.sql.pandas.types import _dedup_names, _deduplicate_field_names, 
to_arrow_schema
@@ -446,7 +446,7 @@ class LocalDataToArrowConversion:
 
             return pa.Table.from_arrays(pylist, schema=pa_schema)
         else:
-            return pa.table({"_": [None] * len(rows)}).drop("_")
+            return pa.Table.from_struct_array(pa.array([{}] * len(rows)))
 
 
 class ArrowTableToRowsConversion:
@@ -687,8 +687,24 @@ class ArrowTableToRowsConversion:
             else:
                 return lambda value: value
 
+    @overload
+    @staticmethod
+    def convert(  # type: ignore[overload-overlap]
+        table: "pa.Table", schema: StructType
+    ) -> List[Row]:
+        pass
+
+    @overload
     @staticmethod
-    def convert(table: "pa.Table", schema: StructType) -> List[Row]:
+    def convert(
+        table: "pa.Table", schema: StructType, *, return_as_tuples: bool = True
+    ) -> List[tuple]:
+        pass
+
+    @staticmethod  # type: ignore[misc]
+    def convert(
+        table: "pa.Table", schema: StructType, *, return_as_tuples: bool = 
False
+    ) -> List[Union[Row, tuple]]:
         require_minimum_pyarrow_version()
         import pyarrow as pa
 
@@ -709,8 +725,14 @@ class ArrowTableToRowsConversion:
                 for column, conv in zip(table.columns, field_converters)
             ]
 
-            rows = [_create_row(fields, tuple(cols)) for cols in 
zip(*columnar_data)]
+            if return_as_tuples:
+                rows = [tuple(cols) for cols in zip(*columnar_data)]
+            else:
+                rows = [_create_row(fields, tuple(cols)) for cols in 
zip(*columnar_data)]
             assert len(rows) == table.num_rows, f"{len(rows)}, 
{table.num_rows}"
             return rows
         else:
-            return [_create_row(fields, tuple())] * table.num_rows
+            if return_as_tuples:
+                return [tuple()] * table.num_rows
+            else:
+                return [_create_row(fields, tuple())] * table.num_rows
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 5febf16fa3aa..d839cf00eb0a 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1694,9 +1694,10 @@ def read_udtf(pickleSer, infile, eval_type):
                     names = [f"_{n}" for n in range(len(list_args))]
                     t = pa.Table.from_arrays(list_args, names=names)
                     schema = from_arrow_schema(t.schema, 
prefers_large_var_types)
-                    rows = ArrowTableToRowsConversion.convert(t, schema=schema)
+                    rows = ArrowTableToRowsConversion.convert(
+                        t, schema=schema, return_as_tuples=True
+                    )
                     for row in rows:
-                        row = tuple(row)  # type: ignore[assignment]
                         for batch in convert_to_arrow(func(*row)):
                             yield verify_result(batch), arrow_return_type
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to