This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new f7fa4fefb2e7 [SPARK-57137][PYTHON][TEST] Share base mixin across 
Arrow/Pandas siblings
f7fa4fefb2e7 is described below

commit f7fa4fefb2e7a349d8549c541d43274b0b345b95
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu May 28 16:59:20 2026 -0700

    [SPARK-57137][PYTHON][TEST] Share base mixin across Arrow/Pandas siblings
    
    ### What changes were proposed in this pull request?
    
    Reduce duplication between Arrow and Pandas sibling mixins in 
`python/benchmarks/bench_eval_type.py` by making the Pandas variant subclass 
the Arrow variant, mirroring the existing iter-subclasses-noniter pattern. 
Applies to two pairs:
    
    - `_ScalarPandasBenchMixin` now subclasses `_ScalarArrowBenchMixin`
    - `_GroupedAggPandasBenchMixin` now subclasses `_GroupedAggArrowBenchMixin`
    
    The shared `_build_scenario` and `_write_scenario` are pulled up into the 
Arrow base, with the eval type parameterized via the `_eval_type` class 
attribute (the Scalar pair already used this; this PR extends the same pattern 
to GroupedAgg). `_build_scenario` is converted from `staticmethod` to 
`classmethod` so subclasses read their own `_scenario_configs`.
    
    As a follow-on benefit, `_GroupedAggArrowIterBenchMixin` (already a 
subclass of the Arrow base) drops its now-redundant copy of `_write_scenario`.
    
    Net diff: +27 / -102 lines.
    
    ### Why are the changes needed?
    
    Before: each sibling pair had two near-identical `_write_scenario` bodies, 
differing only in the hard-coded `PythonEvalType.SQL_...` constant and the UDF 
set. Any change to the protocol-writing logic (runner_conf, eval_conf, payload 
framing) had to be applied in lock-step across both sibling halves -- a known 
footgun. The pattern already used by the iter subclasses (override `_eval_type` 
+ `_udfs`, inherit everything else) generalizes cleanly to the Arrow/Pandas 
axis.
    
    The Window and CogroupedMap pairs are intentionally left out of this PR to 
avoid conflicting with two in-flight PRs (#56167 makes the Window Arrow mixin 
lazy; #56171 renames `wide_values` to `wide_cols` in the Cogroup pair). Both 
pairs can be folded into the same base-class pattern in a follow-up once those 
land.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Test-only change in the benchmark module.
    
    ### How was this patch tested?
    
    - Structural: confirmed `_eval_type` resolves correctly via MRO for all 
seven affected mixins (Scalar Arrow / Arrow-Iter / Pandas / Pandas-Iter, 
GroupedAgg Arrow / Arrow-Iter / Pandas).
    - Confirmed `_ScalarPandasBenchMixin._scenario_configs` still holds the 
pandas-sized row counts (1M `pure_ints`), not the Arrow base's 5M -- the main 
MRO-resolution risk of switching `_build_scenario` to `classmethod`.
    - Ran `setup` + `time_worker` end-to-end for all 7 `*TimeBench` classes 
(including both UDF arities for the GroupedAgg variants).
    - Ran `peakmem_worker` for one bench class per pair.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Generated-by: Claude Code (claude-opus-4-7)
    
    Closes #56173 from viirya/SPARK-55724-sibling-base-followup.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
    (cherry picked from commit 40e9a14a087beceab29567be6e47af25b9970635)
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 python/benchmarks/bench_eval_type.py | 129 ++++++++---------------------------
 1 file changed, 27 insertions(+), 102 deletions(-)

diff --git a/python/benchmarks/bench_eval_type.py 
b/python/benchmarks/bench_eval_type.py
index 2646dec501a4..1140ec7db3d6 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -938,11 +938,11 @@ class _GroupedAggArrowBenchMixin:
         "wide_cols": (200, 5_000, 20),
     }
 
-    @staticmethod
-    def _build_scenario(name):
+    @classmethod
+    def _build_scenario(cls, name):
         """Build a single scenario by name."""
         np.random.seed(42)
-        num_groups, rows_per_group, n_cols = 
_GroupedAggArrowBenchMixin._scenario_configs[name]
+        num_groups, rows_per_group, n_cols = cls._scenario_configs[name]
         return MockDataFactory.make_grouped_batches(
             num_groups=num_groups,
             num_rows=rows_per_group,
@@ -951,6 +951,7 @@ class _GroupedAggArrowBenchMixin:
             batch_size=rows_per_group,
         )
 
+    _eval_type = PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF
     _udfs = {
         "sum_udf": _grouped_agg_arrow_sum,
         "mean_multi_udf": _grouped_agg_arrow_mean_multi,
@@ -974,7 +975,7 @@ class _GroupedAggArrowBenchMixin:
             MockProtocolWriter.write_udf_payload(udf_func, return_type, 
arg_offsets, b)
 
         MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+            self._eval_type,
             write_udf,
             lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
             buf,
@@ -1010,6 +1011,7 @@ class 
_GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin):
             total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 
0)
         return total
 
+    _eval_type = PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF
     _udfs = {
         "sum_udf": _grouped_agg_arrow_iter_sum,
         "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi,
@@ -1017,28 +1019,6 @@ class 
_GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin):
     params = [list(_GroupedAggArrowBenchMixin._scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
-    def _write_scenario(self, scenario, udf_name, buf):
-        groups, _schema = self._build_scenario(scenario)
-        udf_func = self._udfs[udf_name]
-
-        # sum_udf uses 1 arg, mean_multi_udf uses 2 args
-        if "multi" in udf_name:
-            arg_offsets = [0, 1]
-        else:
-            arg_offsets = [0]
-
-        return_type = DoubleType()
-
-        def write_udf(b):
-            MockProtocolWriter.write_udf_payload(udf_func, return_type, 
arg_offsets, b)
-
-        MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
-            write_udf,
-            lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
-            buf,
-        )
-
 
 class GroupedAggArrowIterUDFTimeBench(_GroupedAggArrowIterBenchMixin, 
_TimeBenchBase):
     pass
@@ -1052,8 +1032,15 @@ class 
GroupedAggArrowIterUDFPeakmemBench(_GroupedAggArrowIterBenchMixin, _Peakme
 # UDF receives ``pd.Series`` columns per group, returns scalar.
 
 
-class _GroupedAggPandasBenchMixin:
-    """Provides _write_scenario for SQL_GROUPED_AGG_PANDAS_UDF."""
+class _GroupedAggPandasBenchMixin(_GroupedAggArrowBenchMixin):
+    """Provides _write_scenario for SQL_GROUPED_AGG_PANDAS_UDF.
+
+    Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+    sibling; only the eval type and the UDFs differ. ``_scenario_configs``
+    is intentionally identical to the Arrow variant for apples-to-apples
+    comparison (the aggregations are cheap enough that pandas conversion
+    is not the bottleneck here).
+    """
 
     def _grouped_agg_pandas_sum(col):
         """Sum a single Pandas Series."""
@@ -1063,56 +1050,14 @@ class _GroupedAggPandasBenchMixin:
         """Mean of two Pandas Series combined."""
         return (col0.mean() or 0) + (col1.mean() or 0)
 
-    _scenario_configs = {
-        "few_groups_sm": (50, 5_000, 5),
-        "few_groups_lg": (50, 50_000, 5),
-        "many_groups_sm": (2_000, 500, 5),
-        "many_groups_lg": (500, 10_000, 5),
-        "wide_cols": (200, 5_000, 20),
-    }
-
-    @staticmethod
-    def _build_scenario(name):
-        """Build a single scenario by name."""
-        np.random.seed(42)
-        num_groups, rows_per_group, n_cols = 
_GroupedAggPandasBenchMixin._scenario_configs[name]
-        return MockDataFactory.make_grouped_batches(
-            num_groups=num_groups,
-            num_rows=rows_per_group,
-            num_cols=n_cols,
-            spark_type_pool=MockDataFactory.NUMERIC_TYPES,
-            batch_size=rows_per_group,
-        )
-
+    _eval_type = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
     _udfs = {
         "sum_udf": _grouped_agg_pandas_sum,
         "mean_multi_udf": _grouped_agg_pandas_mean_multi,
     }
-    params = [list(_scenario_configs), list(_udfs)]
+    params = [list(_GroupedAggArrowBenchMixin._scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
-    def _write_scenario(self, scenario, udf_name, buf):
-        groups, _schema = self._build_scenario(scenario)
-        udf_func = self._udfs[udf_name]
-
-        # sum_udf uses 1 arg, mean_multi_udf uses 2 args
-        if "multi" in udf_name:
-            arg_offsets = [0, 1]
-        else:
-            arg_offsets = [0]
-
-        return_type = DoubleType()
-
-        def write_udf(b):
-            MockProtocolWriter.write_udf_payload(udf_func, return_type, 
arg_offsets, b)
-
-        MockProtocolWriter.write_worker_input(
-            PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
-            write_udf,
-            lambda b: MockProtocolWriter.write_grouped_data_payload(groups, 
buf=b),
-            buf,
-        )
-
 
 class GroupedAggPandasUDFTimeBench(_GroupedAggPandasBenchMixin, 
_TimeBenchBase):
     pass
@@ -1587,11 +1532,11 @@ class _ScalarArrowBenchMixin:
         "mixed_types": ("mixed", 5_000_000, 10, 5_000),
     }
 
-    @staticmethod
-    def _build_scenario(name):
+    @classmethod
+    def _build_scenario(cls, name):
         """Build a single scenario by name."""
         np.random.seed(42)
-        type_key, num_rows, num_cols, batch_size = 
_ScalarArrowBenchMixin._scenario_configs[name]
+        type_key, num_rows, num_cols, batch_size = cls._scenario_configs[name]
         pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
         return MockDataFactory.make_batches(
             num_rows=num_rows,
@@ -1676,8 +1621,13 @@ class 
ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBa
 # Measures the full Arrow-to-Pandas-to-Arrow round-trip.
 
 
-class _ScalarPandasBenchMixin:
-    """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks."""
+class _ScalarPandasBenchMixin(_ScalarArrowBenchMixin):
+    """Mixin for SQL_SCALAR_PANDAS_UDF benchmarks.
+
+    Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+    sibling; only the eval type, the UDFs, and the per-scenario row counts
+    differ (pandas conversion is more expensive, so smaller batches).
+    """
 
     def _scalar_pandas_sort(s):
         return s.sort_values().reset_index(drop=True)
@@ -1697,19 +1647,6 @@ class _ScalarPandasBenchMixin:
         "mixed_types": ("mixed", 1_000_000, 10, 5_000),
     }
 
-    @staticmethod
-    def _build_scenario(name):
-        """Build a single scenario by name."""
-        np.random.seed(42)
-        type_key, num_rows, num_cols, batch_size = 
_ScalarPandasBenchMixin._scenario_configs[name]
-        pool = MockDataFactory.NAMED_TYPE_POOLS[type_key]
-        return MockDataFactory.make_batches(
-            num_rows=num_rows,
-            num_cols=num_cols,
-            spark_type_pool=pool,
-            batch_size=batch_size,
-        )
-
     _eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
     # ret_type=None means "use schema.fields[0].dataType from the scenario"
     _udfs = {
@@ -1720,18 +1657,6 @@ class _ScalarPandasBenchMixin:
     params = [list(_scenario_configs), list(_udfs)]
     param_names = ["scenario", "udf"]
 
-    def _write_scenario(self, scenario, udf_name, buf):
-        batches, schema = self._build_scenario(scenario)
-        udf_func, ret_type, arg_offsets = self._udfs[udf_name]
-        if ret_type is None:
-            ret_type = schema.fields[0].dataType
-        MockProtocolWriter.write_worker_input(
-            self._eval_type,
-            lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, 
arg_offsets, b),
-            lambda b: MockProtocolWriter.write_data_payload(iter(batches), b),
-            buf,
-        )
-
 
 class ScalarPandasUDFTimeBench(_ScalarPandasBenchMixin, _TimeBenchBase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to