This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f612cd0ada2a [SPARK-37711][PS] Reduce pandas describe job count from 
O(N) to O(1)
f612cd0ada2a is described below

commit f612cd0ada2a9fd5b38157d8c7be40e2e3b323de
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon Mar 2 14:25:22 2026 +0800

    [SPARK-37711][PS] Reduce pandas describe job count from O(N) to O(1)
    
    I generated some benchmarks for new implementation and compared against the 
old implementation. The performance numbers are show below.
    
    - **Row counts:** 1,000 and 10,000
    - **Column counts:** 2, 5, 10, 20, 40, 100
    - **Data distribution:** Random uniform distribution over 10 distinct 
values per column
    - **Total tests:** 11 configurations
    
    | Rows    | Columns | Old Time | New Time | Speedup  | Time Saved | 
Improvement | Jobs (Old→New) | Jobs Saved |
    
|---------|---------|----------|----------|----------|------------|-------------|----------------|------------|
    | 1,000   | **1**   | **0.125s** | **0.188s** | **0.66x** | **-0.063s** | 
**-50.6%** | **2 → 3** | **-1** |
    | 1,000   | 2       | 0.226s   | 0.233s   | 0.97x    | -0.007s    | -2.9%   
    | 4 → 3          | 1          |
    | 1,000   | 5       | 0.501s   | 0.225s   | 2.23x    | 0.276s     | 55.1%   
    | 10 → 3         | 7          |
    | 1,000   | 10      | 0.861s   | 0.351s   | 2.46x    | 0.511s     | 59.3%   
    | 20 → 3         | 17         |
    | 1,000   | 20      | 1.539s   | 0.418s   | 3.68x    | 1.120s     | 72.8%   
    | 40 → 3         | 37         |
    | 1,000   | 40      | 3.176s   | 0.514s   | 6.18x    | 2.662s     | 83.8%   
    | 80 → 3         | 77         |
    | 1,000   | 100     | 7.483s   | 0.586s   | 12.77x   | 6.897s     | 92.2%   
    | 200 → 3        | 197        |
    | 10,000  | **1**   | **0.073s** | **0.111s** | **0.66x** | **-0.038s** | 
**-51.9%** | **2 → 3** | **-1** |
    | 10,000  | 5       | 0.362s   | 0.194s   | 1.87x    | 0.168s     | 46.5%   
    | 10 → 3         | 7          |
    | 10,000  | 10      | 1.446s   | 0.257s   | 5.61x    | 1.188s     | 82.2%   
    | 20 → 3         | 17         |
    | 10,000  | 20      | 1.424s   | 0.382s   | 3.72x    | 1.041s     | 73.1%   
    | 40 → 3         | 37         |
    | 10,000  | 40      | 3.171s   | 0.521s   | 6.09x    | 2.650s     | 83.6%   
    | 80 → 3         | 77         |
    | 10,000  | 100     | 10.953s  | 1.163s   | 9.41x    | 9.789s     | 89.4%   
    | 200 → 3        | 197        |
    
    **Aggregate Statistics:**
    - Average speedup: 4.33x
    - Average improvement: 48.7%
    - Average jobs saved: 54.2 per operation
    - Maximum speedup: 12.77x (100 columns)
    - **Regression case: 0.66x for N=1** (new approach is 50% slower)
    
    ### What changes were proposed in this pull request?
    
    Fixes describe for string-only dataframes to have a fixed number of jobs 
rather than one job per column
    
    ### Why are the changes needed?
    
    Performance
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored-by: Claude Sonnet 4.5
    
    Closes #54370 from devin-petersohn/devin/describe_strings_oneshot.
    
    Authored-by: Devin Petersohn <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/pandas/frame.py | 36 +++++++++++++++++++++++++-----------
 1 file changed, 25 insertions(+), 11 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 4e85c6b73301..f3072c2f4fec 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -10015,7 +10015,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             ]
             sdf = internal.spark_frame.select(*exprs_string)
 
-            # Get `count` & `unique` for each columns
+            # Get `count` & `unique` for each column
             counts, uniques = map(lambda x: x[1:], sdf.summary("count", 
"count_distinct").take(2))
             # Handling Empty DataFrame
             if len(counts) == 0 or counts[0] == "0":
@@ -10024,16 +10024,30 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                     data[psser.name] = [0, 0, np.nan, np.nan]
                 return DataFrame(data, index=["count", "unique", "top", 
"freq"])
 
-            # Get `top` & `freq` for each columns
-            tops = []
-            freqs = []
-            # TODO(SPARK-37711): We should do it in single pass since invoking 
Spark job
-            #   for every columns is too expensive.
-            for column in exprs_string:
-                top, freq = sdf.groupby(column).count().sort("count", 
ascending=False).first()
-                tops.append(str(top))
-                freqs.append(str(freq))
-
+            if len(exprs_string) == 1:
+                # Fast path for single column (e.g. Series.describe): avoid 
unpivot overhead.
+                top, freq = (
+                    sdf.groupby(exprs_string[0]).count().sort("count", 
ascending=False).first()
+                )
+                tops = [str(top)]
+                freqs = [str(freq)]
+            else:
+                # Get `top` & `freq` for each column in a single pass.
+                # Unpivot all string columns into (idx, str_value) pairs using 
posexplode,
+                # then find the most frequent value per column via the struct 
min trick.
+                # The negative count ensures min(struct(neg_count, str_value)) 
picks the
+                # highest count; among ties, the alphabetically first value 
(matching pandas).
+                rows = (
+                    
sdf.select(F.posexplode(F.array(*exprs_string)).alias("idx", "str_value"))
+                    .groupby("idx", "str_value")
+                    .agg(F.negative(F.count("*")).alias("neg_count"))
+                    .groupby("idx")
+                    .agg(F.min(F.struct("neg_count", "str_value")).alias("s"))
+                    .sort("idx")
+                    .collect()
+                )
+                tops = [str(r.s.str_value) for r in rows]
+                freqs = [str(-r.s.neg_count) for r in rows]
             stats = [counts, uniques, tops, freqs]
             stats_names = ["count", "unique", "top", "freq"]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to