This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c18fc072b05 [SPARK-49365][PS] Simplify the bucket aggregation in hist
plot
0c18fc072b05 is described below
commit 0c18fc072b05671bc9c74a43de49b563a1ef7907
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sat Aug 24 16:34:48 2024 +0800
[SPARK-49365][PS] Simplify the bucket aggregation in hist plot
### What changes were proposed in this pull request?
Simplify the bucket aggregation in hist plot
### Why are the changes needed?
to simplify the implementation, by eliminating the multiple dataframes Union
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI and manually check
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47852 from zhengruifeng/plot_parallel_hist.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/plot/core.py | 29 +++++++++++------------------
1 file changed, 11 insertions(+), 18 deletions(-)
diff --git a/python/pyspark/pandas/plot/core.py
b/python/pyspark/pandas/plot/core.py
index 3ec78100abe9..e5db0bd701f1 100644
--- a/python/pyspark/pandas/plot/core.py
+++ b/python/pyspark/pandas/plot/core.py
@@ -198,25 +198,18 @@ class HistogramPlotBase(NumericPlotBase):
idx = bisect.bisect(bins, value) - 1
return float(idx)
- output_df = None
- for group_id, (colname, bucket_name) in enumerate(zip(colnames,
bucket_names)):
- # sdf.na.drop to match handleInvalid="skip" in Bucketizer
-
- bucket_df = sdf.na.drop(subset=[colname]).withColumn(
- bucket_name,
- binary_search_for_buckets(F.col(colname).cast("double")),
+ output_df = (
+ sdf.select(
+ F.posexplode(
+ F.array([F.col(colname).cast("double") for colname in
colnames])
+ ).alias("__group_id", "__value")
)
-
- if output_df is None:
- output_df = bucket_df.select(
- F.lit(group_id).alias("__group_id"),
F.col(bucket_name).alias("__bucket")
- )
- else:
- output_df = output_df.union(
- bucket_df.select(
- F.lit(group_id).alias("__group_id"),
F.col(bucket_name).alias("__bucket")
- )
- )
+ # to match handleInvalid="skip" in Bucketizer
+ .where(F.col("__value").isNotNull() &
~F.col("__value").isNaN()).select(
+ F.col("__group_id"),
+ binary_search_for_buckets(F.col("__value")).alias("__bucket"),
+ )
+ )
# 2. Calculate the count based on each group and bucket.
# +----------+-------+------+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]