This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 481f8a96d974 [SPARK-57138][PYTHON][TEST] Share base mixin across
Window and Cogroup Arrow/Pandas siblings
481f8a96d974 is described below
commit 481f8a96d9743b39165b56154b1fa3fc937960e6
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri May 29 00:34:23 2026 -0700
[SPARK-57138][PYTHON][TEST] Share base mixin across Window and Cogroup
Arrow/Pandas siblings
### What changes were proposed in this pull request?
Follow-up to SPARK-57137. Extend the Arrow/Pandas sibling base-mixin
pattern to the remaining two pairs in `python/benchmarks/bench_eval_type.py`:
- `_WindowAggPandasBenchMixin` now subclasses `_WindowAggArrowBenchMixin`
- `_CogroupedMapPandasBenchMixin` now subclasses
`_CogroupedMapArrowBenchMixin`
The shared `_build_scenario` / `_write_scenario` are pulled up into the
Arrow base, with the eval type parameterized via the `_eval_type` class
attribute and `_build_scenario` converted from `staticmethod` to `classmethod`
so subclasses read their own `_scenario_configs` (the same mechanism
SPARK-57137 used for the Scalar and GroupedAgg pairs).
- Window: the Pandas half drops `_scenario_configs` entirely (identical to
the Arrow variant) and keeps only `_eval_type`, its UDFs, and `params`.
- Cogroup: the Arrow `_udfs` values are normalized to `(func, n_args)`
tuples to match the Pandas sibling, so the inherited `_write_scenario` works
unchanged. The Pandas half keeps its own scaled-down `_scenario_configs` and
the extra 3-arg `key_identity_udf` variant.
Net diff: +39 / -102 lines.
### Why are the changes needed?
These two pairs were intentionally left out of SPARK-57137 to avoid
conflicting with two in-flight PRs (#56167 made the Window Arrow mixin
scenarios lazy; #56171 renamed `wide_values` to `wide_cols` in the Cogroup
pair). Both have since merged, so the pairs can now be folded into the same
base-class pattern.
Before this change, each sibling pair carried two near-identical
`_write_scenario` bodies differing only in the `PythonEvalType.SQL_...`
constant (and, for Cogroup, the UDF set) -- a known footgun where any
protocol-writing change had to be applied in lock-step across both halves.
### Does this PR introduce _any_ user-facing change?
No. Test-only change in the benchmark module.
### How was this patch tested?
- Structural: confirmed `_eval_type` resolves correctly via MRO for all
four affected mixins; confirmed
`_CogroupedMapPandasBenchMixin._scenario_configs` still holds the scaled-down
pandas row counts (not the Arrow base's), the main MRO-resolution risk of
switching `_build_scenario` to `classmethod`.
- Ran `setup` + `time_worker` end-to-end for all four affected `*TimeBench`
classes across every UDF (including the Pandas-only 3-arg `key_identity_udf`).
- Ran `peakmem_worker` (disk-replay path) for the Window and Cogroup Pandas
classes.
- Confirmed the generated wire bytes are byte-identical to the pre-refactor
output for Window and Cogroup Arrow; the Cogroup Pandas UDF pickles differ only
in the embedded class-hierarchy reference (same length, identical execution),
matching what SPARK-57137 produced for the Scalar/GroupedAgg pairs.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude Code (claude-opus-4-8)
Closes #56194 from viirya/SPARK-55724-window-cogroup-base.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
python/benchmarks/bench_eval_type.py | 141 ++++++++++-------------------------
1 file changed, 39 insertions(+), 102 deletions(-)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 1140ec7db3d6..17b112e5f89f 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -758,17 +758,15 @@ class _CogroupedMapArrowBenchMixin:
"multi_key": (200, 5_000, 3, 5),
}
- @staticmethod
- def _build_scenario(name):
+ @classmethod
+ def _build_scenario(cls, name):
"""Build a cogroup scenario: two DataFrames with the same grouping
structure.
Unlike grouped map (which wraps columns in a struct), cogroup batches
have flat columns: [key_col_0, ..., key_col_k, val_col_0, ...,
val_col_v].
"""
np.random.seed(42)
- num_groups, rows_per_group, num_key_cols, num_value_cols = (
- _CogroupedMapArrowBenchMixin._scenario_configs[name]
- )
+ num_groups, rows_per_group, num_key_cols, num_value_cols =
cls._scenario_configs[name]
n_cols = num_key_cols + num_value_cols
type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
while len(type_pool) < n_cols:
@@ -784,22 +782,27 @@ class _CogroupedMapArrowBenchMixin:
return_type = StructType(schema.fields[num_key_cols:])
return (cogroups, return_type, num_key_cols, num_value_cols)
+ _eval_type = PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF
+ # Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
+ # n_args=3 -> func(key, left, right). The Arrow path has no 3-arg variant,
+ # but the tuple shape is shared with the Pandas sibling so
``_write_scenario``
+ # can be inherited unchanged.
_udfs = {
- "identity_udf": _cogrouped_map_arrow_identity,
- "concat_udf": _cogrouped_map_arrow_concat,
- "left_semi_udf": _cogrouped_map_arrow_left_semi,
+ "identity_udf": (_cogrouped_map_arrow_identity, 2),
+ "concat_udf": (_cogrouped_map_arrow_concat, 2),
+ "left_semi_udf": (_cogrouped_map_arrow_left_semi, 2),
}
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
def _write_scenario(self, scenario, udf_name, buf):
groups, schema, num_key_cols, num_value_cols =
self._build_scenario(scenario)
- udf_func = self._udfs[udf_name]
+ udf_func, _ = self._udfs[udf_name]
left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols,
num_value_cols)
right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols,
num_value_cols)
arg_offsets = left_offsets + right_offsets
MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
+ self._eval_type,
lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema,
arg_offsets, b),
lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
buf,
@@ -819,8 +822,15 @@ class
CogroupedMapArrowUDFPeakmemBench(_CogroupedMapArrowBenchMixin, _PeakmemBen
# ``pandas.DataFrame``. Optional 3-arg variant ``(key, left, right)``.
-class _CogroupedMapPandasBenchMixin:
- """Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF."""
+class _CogroupedMapPandasBenchMixin(_CogroupedMapArrowBenchMixin):
+ """Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF.
+
+ Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+ sibling; only the eval type, the UDFs, and the per-scenario row counts
+ differ. Adds a 3-arg ``key_identity_udf`` variant the Arrow path lacks
+ (``_write_scenario`` ignores the ``n_args`` slot, so the extra entry is
+ handled by the inherited writer).
+ """
def _cogrouped_map_pandas_identity(left, right):
"""Identity cogroup UDF: returns left DataFrame as-is."""
@@ -852,32 +862,7 @@ class _CogroupedMapPandasBenchMixin:
"multi_key": (100, 1_000, 3, 5),
}
- @staticmethod
- def _build_scenario(name):
- """Build a cogroup scenario: two DataFrames with the same grouping
structure.
-
- Like cogrouped arrow, batches have flat columns:
- [key_col_0, ..., key_col_k, val_col_0, ..., val_col_v].
- """
- np.random.seed(42)
- num_groups, rows_per_group, num_key_cols, num_value_cols = (
- _CogroupedMapPandasBenchMixin._scenario_configs[name]
- )
- n_cols = num_key_cols + num_value_cols
- type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
- while len(type_pool) < n_cols:
- type_pool = type_pool + MockDataFactory.MIXED_TYPES[: n_cols -
len(type_pool)]
-
- cogroups, schema = MockDataFactory.make_cogrouped_batches(
- num_groups=num_groups,
- num_rows=rows_per_group,
- num_cols=n_cols,
- spark_type_pool=type_pool,
- batch_size=rows_per_group,
- )
- return_type = StructType(schema.fields[num_key_cols:])
- return (cogroups, return_type, num_key_cols, num_value_cols)
-
+ _eval_type = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
# Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
# n_args=3 -> func(key, left, right).
_udfs = {
@@ -889,19 +874,6 @@ class _CogroupedMapPandasBenchMixin:
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
- def _write_scenario(self, scenario, udf_name, buf):
- groups, schema, num_key_cols, num_value_cols =
self._build_scenario(scenario)
- udf_func, _ = self._udfs[udf_name]
- left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols,
num_value_cols)
- right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols,
num_value_cols)
- arg_offsets = left_offsets + right_offsets
- MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
- lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema,
arg_offsets, b),
- lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
- buf,
- )
-
class CogroupedMapPandasUDFTimeBench(_CogroupedMapPandasBenchMixin,
_TimeBenchBase):
pass
@@ -1729,11 +1701,11 @@ class _WindowAggArrowBenchMixin:
"wide_cols": (200, 5_000, 20),
}
- @staticmethod
- def _build_scenario(name):
+ @classmethod
+ def _build_scenario(cls, name):
"""Build a single scenario by name."""
np.random.seed(42)
- num_groups, rows_per_group, n_cols =
_WindowAggArrowBenchMixin._scenario_configs[name]
+ num_groups, rows_per_group, n_cols = cls._scenario_configs[name]
return MockDataFactory.make_grouped_batches(
num_groups=num_groups,
num_rows=rows_per_group,
@@ -1742,6 +1714,7 @@ class _WindowAggArrowBenchMixin:
batch_size=rows_per_group,
)
+ _eval_type = PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF
_udfs = {
"sum_udf": _window_agg_arrow_sum,
"mean_multi_udf": _window_agg_arrow_mean_multi,
@@ -1765,7 +1738,7 @@ class _WindowAggArrowBenchMixin:
MockProtocolWriter.write_udf_payload(udf_func, return_type,
arg_offsets, b)
MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
+ self._eval_type,
write_udf,
lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
buf,
@@ -1785,8 +1758,15 @@ class
WindowAggArrowUDFPeakmemBench(_WindowAggArrowBenchMixin, _PeakmemBenchBase
# UDF receives ``pd.Series`` columns for the entire window partition, returns
scalar.
-class _WindowAggPandasBenchMixin:
- """Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF."""
+class _WindowAggPandasBenchMixin(_WindowAggArrowBenchMixin):
+ """Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF.
+
+ Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
+ sibling; only the eval type and the UDFs differ. ``_scenario_configs``
+ is intentionally identical to the Arrow variant for apples-to-apples
+ comparison (the aggregations are cheap enough that pandas conversion
+ is not the bottleneck here).
+ """
def _window_agg_pandas_sum(col):
"""Sum a single Pandas Series."""
@@ -1796,57 +1776,14 @@ class _WindowAggPandasBenchMixin:
"""Mean of two Pandas Series combined."""
return (col0.mean() or 0) + (col1.mean() or 0)
- _scenario_configs = {
- "few_groups_sm": (50, 5_000, 5),
- "few_groups_lg": (50, 50_000, 5),
- "many_groups_sm": (2_000, 500, 5),
- "many_groups_lg": (500, 10_000, 5),
- "wide_cols": (200, 5_000, 20),
- }
-
- @staticmethod
- def _build_scenario(name):
- """Build a single scenario by name."""
- np.random.seed(42)
- num_groups, rows_per_group, n_cols =
_WindowAggPandasBenchMixin._scenario_configs[name]
- return MockDataFactory.make_grouped_batches(
- num_groups=num_groups,
- num_rows=rows_per_group,
- num_cols=n_cols,
- spark_type_pool=MockDataFactory.NUMERIC_TYPES,
- batch_size=rows_per_group,
- )
-
+ _eval_type = PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF
_udfs = {
"sum_udf": _window_agg_pandas_sum,
"mean_multi_udf": _window_agg_pandas_mean_multi,
}
- params = [list(_scenario_configs), list(_udfs)]
+ params = [list(_WindowAggArrowBenchMixin._scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]
- def _write_scenario(self, scenario, udf_name, buf):
- groups, _schema = self._build_scenario(scenario)
- udf_func = self._udfs[udf_name]
-
- # sum_udf uses 1 arg, mean_multi_udf uses 2 args
- if "multi" in udf_name:
- arg_offsets = [0, 1]
- else:
- arg_offsets = [0]
-
- return_type = DoubleType()
-
- def write_udf(b):
- MockProtocolWriter.write_udf_payload(udf_func, return_type,
arg_offsets, b)
-
- MockProtocolWriter.write_worker_input(
- PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
- write_udf,
- lambda b: MockProtocolWriter.write_grouped_data_payload(groups,
buf=b),
- buf,
- runner_conf={"window_bound_types": "unbounded"},
- )
-
class WindowAggPandasUDFTimeBench(_WindowAggPandasBenchMixin, _TimeBenchBase):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]