This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f689ff75a037 [SPARK-53939][PYTHON] Use batch.num_columns instead of 
len(batch.columns)
f689ff75a037 is described below

commit f689ff75a037d90e78312e6d3c826b498300a703
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Oct 17 13:12:33 2025 +0900

    [SPARK-53939][PYTHON] Use batch.num_columns instead of len(batch.columns)
    
    ### What changes were proposed in this pull request?
    
    Use `batch.num_columns` instead of `len(batch.columns)` in 
`ArrowStreamUDFSerializer`.
    
    ### Why are the changes needed?
    
    `len(batch.columns)` has a performance overhead compared with 
`batch.num_columns`.
    It should be avoided.
    
    <details>
    <summary>benchmark_column_count_access</summary>
    
    ```python
    import pyarrow as pa
    import time
    import numpy as np
    
    def benchmark_column_count_access():
        # Create test data with varying number of columns
        column_counts = [10, 50, 100, 500, 1000]
        iterations = 1_000_000
    
        print("PyArrow RecordBatch Column Count Access Benchmark")
        print("=" * 70)
        print(f"Iterations: {iterations:,}")
        print()
    
        for num_cols in column_counts:
            # Create a RecordBatch with the specified number of columns
            arrays = [pa.array(np.random.rand(100)) for _ in range(num_cols)]
            names = [f"col_{i}" for i in range(num_cols)]
            batch = pa.record_batch(arrays, names=names)
    
            # Benchmark len(batch.columns)
            start = time.perf_counter()
            for _ in range(iterations):
                _ = len(batch.columns)
            time_len_columns = time.perf_counter() - start
    
            # Benchmark batch.num_columns
            start = time.perf_counter()
            for _ in range(iterations):
                _ = batch.num_columns
            time_num_columns = time.perf_counter() - start
    
            # Calculate speedup
            speedup = time_len_columns / time_num_columns
    
            print(f"Columns: {num_cols:>4}")
            print(f"  len(batch.columns):  {time_len_columns:.4f}s")
            print(f"  batch.num_columns:   {time_num_columns:.4f}s")
            print(f"  Speedup:             {speedup:.2f}x")
            print(f"  Difference:          {(time_len_columns - 
time_num_columns)*1000:.2f}ms")
            print()
    ```
    
    </details>
    
    ```
    PyArrow RecordBatch Column Count Access Benchmark
    ======================================================================
    Iterations: 1,000,000
    
    Columns:   10
      len(batch.columns):  4.0907s
      batch.num_columns:   0.0215s
      Speedup:             190.38x
      Difference:          4069.23ms
    
    Columns:   50
      len(batch.columns):  19.7623s
      batch.num_columns:   0.0212s
      Speedup:             932.47x
      Difference:          19741.09ms
    
    Columns:  100
      len(batch.columns):  39.7946s
      batch.num_columns:   0.0191s
      Speedup:             2088.70x
      Difference:          39775.53ms
    
    Columns:  500
      len(batch.columns):  205.2582s
      batch.num_columns:   0.0210s
      Speedup:             9783.63x
      Difference:          205237.24ms
    
    Columns: 1000
      len(batch.columns):  447.2963s
      batch.num_columns:   0.0234s
      Speedup:             19085.39x
      Difference:          447272.90ms
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The existing tests should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52639 from ueshin/issues/SPARK-53939/num_columns.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 55fbfe4b34bb..59358578d729 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -168,7 +168,7 @@ class ArrowStreamUDFSerializer(ArrowStreamSerializer):
                 assert isinstance(batch, pa.RecordBatch)
 
                 # Wrap the root struct
-                if len(batch.columns) == 0:
+                if batch.num_columns == 0:
                     # When batch has no column, it should still create
                     # an empty batch with the number of rows set.
                     struct = pa.array([{}] * batch.num_rows)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to