This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 24a42292c0ab [SPARK-57137][PYTHON][TEST] Share base mixin across
Arrow/Pandas siblings
24a42292c0ab is described below
commit 24a42292c0aba5cc01c56e1651dd447a3b60f13a
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu May 28 16:59:20 2026 -0700
[SPARK-57137][PYTHON][TEST] Share base mixin across Arrow/Pandas siblings
### What changes were proposed in this pull request?
Reduce duplication between Arrow and Pandas sibling mixins in
`python/benchmarks/bench_eval_type.py` by making the Pandas variant subclass
the Arrow variant, mirroring the existing iter-subclasses-noniter pattern.
Applies to two pairs:
- `_ScalarPandasBenchMixin` now subclasses `_ScalarArrowBenchMixin`
- `_GroupedAggPandasBenchMixin` now subclasses `_GroupedAggArrowBenchMixin`
The shared `_build_scenario` and `_write_scenario` are pulled up into the
Arrow base, with the eval type parameterized via the `_eval_type` class
attribute (the Scalar pair already used this; this PR extends the same pattern
to GroupedAgg). `_build_scenario` is converted from `staticmethod` to
`classmethod` so subclasses read their own `_scenario_configs`.
As a follow-on benefit, `_GroupedAggArrowIterBenchMixin` (already a
subclass of the Arrow base) drops its now-redundant copy of `_write_scenario`.
Net diff: +27 / -102 lines.
### Why are the changes needed?
Before: each sibling pair had two near-identical `_write_scenario` bodies,
differing only in the hard-coded `PythonEvalType.SQL_...` constant and the UDF
set. Any change to the protocol-writing logic (runner_conf, eval_conf, payload
framing) had to be applied in lock-step across both sibling halves -- a known
footgun. The pattern already used by the iter subclasses (override `_eval_type`
+ `_udfs`, inherit everything else) generalizes cleanly to the Arrow/Pandas
axis.
The Window and CogroupedMap pairs are intentionally left out of this PR to
avoid conflicting with two in-flight PRs (#56167 makes the Window Arrow mixin
lazy; #56171 renames `wide_values` to `wide_cols` in the Cogroup pair). Both
pairs can be folded into the same base-class pattern in a follow-up once those
land.
### Does this PR introduce _any_ user-facing change?
No. Test-only change in the benchmark module.
### How was this patch tested?
- Structural: confirmed `_eval_type` resolves correctly via MRO for all
seven affected mixins (Scalar Arrow / Arrow-Iter / Pandas / Pandas-Iter,
GroupedAgg Arrow / Arrow-Iter / Pandas).
- Confirmed `_ScalarPandasBenchMixin._scenario_configs` still holds the
pandas-sized row counts (1M `pure_ints`), not the Arrow base's 5M -- the main
MRO-resolution risk of switching `_build_scenario` to `classmethod`.
- Ran `setup` + `time_worker` end-to-end for all 7 `*TimeBench` classes
(including both UDF arities for the GroupedAgg variants).
- Ran `peakmem_worker` for one bench class per pair.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude Code (claude-opus-4-7)
Closes #56173 from viirya/SPARK-55724-sibling-base-followup.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 40e9a14a087beceab29567be6e47af25b9970635)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
python/benchmarks/bench_eval_type.py | 129 ++++++++---------------------------
1 file changed, 27 insertions(+), 102 deletions(-)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index dc73d5e4a8ec..6cd99d9cbf0f 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -680,11 +680,11 @@ class _GroupedAggArrowBenchMixin:
"wide_cols": (200, 5_000, 20),
}
- @staticmethod
- def _build_scenario(name):
+ @classmethod
+ def _build_scenario(cls, name):
"""Build a single scenario by name."""
np.random.seed(42)
- num_groups, rows_per_group, n_cols =
_GroupedAggArrowBenchMixin._scenario_configs[name]
+ num_groups, rows_per_group, n_cols = cls._scenario_configs[name]
return MockDataFactory.make_grouped_batches(
num_groups=num_groups,
num_rows=rows_per_group,
@@ -693,6 +693,7 @@ class _GroupedAggArrowBenchMixin:
batch_size=rows_per_group,
)
+ _eval_type = PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF
_udfs = {
"sum_udf": _grouped_agg_arrow_sum,
"mean_multi_udf": _grouped_agg_arrow_mean_multi,
@@ -716,7 +717,7 @@ class _GroupedAggArrowBenchMixin:
MockProtocolWriter.write_udf_payload(udf_func, return_type,
arg_offsets, b)
MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+ self._eval_type,
write_udf,
lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
buf,
@@ -752,6 +753,7 @@ class
_GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin):
total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or
0)
return total
+ _eval_type = PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF
_udfs = {
"sum_udf": _grouped_agg_arrow_iter_sum,
"mean_multi_udf": _grouped_agg_arrow_iter_mean_multi,
@@ -759,28 +761,6 @@ class
_GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin):
params = [list(_GroupedAggArrowBenchMixin._scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
- def _write_scenario(self, scenario, udf_name, buf):
- groups, _schema = self._build_scenario(scenario)
- udf_func = self._udfs[udf_name]
-
- # sum_udf uses 1 arg, mean_multi_udf uses 2 args
- if "multi" in udf_name:
- arg_offsets = [0, 1]
- else:
- arg_offsets = [0]
-
- return_type = DoubleType()
-
- def write_udf(b):
- MockProtocolWriter.write_udf_payload(udf_func, return_type,
arg_offsets, b)
-
- MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
- write_udf,
- lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
- buf,
- )
-
class GroupedAggArrowIterUDFTimeBench(_GroupedAggArrowIterBenchMixin,
_TimeBenchBase):
pass
@@ -794,8 +774,15 @@ class
GroupedAggArrowIterUDFPeakmemBench(_GroupedAggArrowIterBenchMixin, _Peakme
# UDF receives ``pd.Series`` columns per group, returns scalar.
-class _GroupedAggPandasBenchMixin:
- """Provides _write_scenario for SQL_GROUPED_AGG_PANDAS_UDF."""
+class _GroupedAggPandasBenchMixin(_GroupedAggArrowBenchMixin):
+ """Provides _write_scenario for SQL_GROUPED_AGG_PANDAS_UDF.
+
+ Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+ sibling; only the eval type and the UDFs differ. ``_scenario_configs``
+ is intentionally identical to the Arrow variant for apples-to-apples
+ comparison (the aggregations are cheap enough that pandas conversion
+ is not the bottleneck here).
+ """
def _grouped_agg_pandas_sum(col):
"""Sum a single Pandas Series."""
@@ -805,56 +792,14 @@ class _GroupedAggPandasBenchMixin:
"""Mean of two Pandas Series combined."""
return (col0.mean() or 0) + (col1.mean() or 0)
- _scenario_configs = {
- "few_groups_sm": (50, 5_000, 5),
- "few_groups_lg": (50, 50_000, 5),
- "many_groups_sm": (2_000, 500, 5),
- "many_groups_lg": (500, 10_000, 5),
- "wide_cols": (200, 5_000, 20),
- }
-
- @staticmethod
- def _build_scenario(name):
- """Build a single scenario by name."""
- np.random.seed(42)
- num_groups, rows_per_group, n_cols =
_GroupedAggPandasBenchMixin._scenario_configs[name]
- return MockDataFactory.make_grouped_batches(
- num_groups=num_groups,
- num_rows=rows_per_group,
- num_cols=n_cols,
- spark_type_pool=MockDataFactory.NUMERIC_TYPES,
- batch_size=rows_per_group,
- )
-
+ _eval_type = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
_udfs = {
"sum_udf": _grouped_agg_pandas_sum,
"mean_multi_udf": _grouped_agg_pandas_mean_multi,
}
- params = [list(_scenario_configs), list(_udfs)]
+ params = [list(_GroupedAggArrowBenchMixin._scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
- def _write_scenario(self, scenario, udf_name, buf):
- groups, _schema = self._build_scenario(scenario)
- udf_func = self._udfs[udf_name]
-
- # sum_udf uses 1 arg, mean_multi_udf uses 2 args
- if "multi" in udf_name:
- arg_offsets = [0, 1]
- else:
- arg_offsets = [0]
-
- return_type = DoubleType()
-
- def write_udf(b):
- MockProtocolWriter.write_udf_payload(udf_func, return_type,
arg_offsets, b)
-
- MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
- write_udf,
- lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
- buf,
- )
-
class GroupedAggPandasUDFTimeBench(_GroupedAggPandasBenchMixin,
_TimeBenchBase):
pass
@@ -1329,11 +1274,11 @@ class _ScalarArrowBenchMixin:
"mixed_types": ("mixed", 5_000_000, 10, 5_000),
}
- @staticmethod
- def _build_scenario(name):
+ @classmethod
+ def _build_scenario(cls, name):
"""Build a single scenario by name."""
np.random.seed(42)
- type_key, num_rows, num_cols, batch_size =
_ScalarArrowBenchMixin._scenario_configs[name]
+ type_key, num_rows, num_cols, batch_size = cls._scenario_configs[name]
pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
return MockDataFactory.make_batches(
num_rows=num_rows,
@@ -1418,8 +1363,13 @@ class
ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBa
# Measures the full Arrow-to-Pandas-to-Arrow round-trip.
-class _ScalarPandasBenchMixin:
- """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks."""
+class _ScalarPandasBenchMixin(_ScalarArrowBenchMixin):
+ """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks.
+
+ Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+ sibling; only the eval type, the UDFs, and the per-scenario row counts
+ differ (pandas conversion is more expensive, so smaller batches).
+ """
def _scalar_pandas_sort(s):
return s.sort_values().reset_index(drop=True)
@@ -1439,19 +1389,6 @@ class _ScalarPandasBenchMixin:
"mixed_types": ("mixed", 1_000_000, 10, 5_000),
}
- @staticmethod
- def _build_scenario(name):
- """Build a single scenario by name."""
- np.random.seed(42)
- type_key, num_rows, num_cols, batch_size =
_ScalarPandasBenchMixin._scenario_configs[name]
- pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
- return MockDataFactory.make_batches(
- num_rows=num_rows,
- num_cols=num_cols,
- spark_type_pool=pool,
- batch_size=batch_size,
- )
-
_eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
# ret_type=None means "use schema.fields[0].dataType from the scenario"
_udfs = {
@@ -1462,18 +1399,6 @@ class _ScalarPandasBenchMixin:
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
- def _write_scenario(self, scenario, udf_name, buf):
- batches, schema = self._build_scenario(scenario)
- udf_func, ret_type, arg_offsets = self._udfs[udf_name]
- if ret_type is None:
- ret_type = schema.fields[0].dataType
- MockProtocolWriter.write_worker_input(
- self._eval_type,
- lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type,
arg_offsets, b),
- lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
- buf,
- )
-
class ScalarPandasUDFTimeBench(_ScalarPandasBenchMixin, _TimeBenchBase):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]