This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dcde7faa612a [SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for
grouped map pandas UDF
dcde7faa612a is described below
commit dcde7faa612a519cbae9174d62b801f1685d3ba4
Author: Yicong-Huang <[email protected]>
AuthorDate: Wed Mar 4 17:33:35 2026 -0800
[SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for grouped map pandas
UDF
### What changes were proposed in this pull request?
Add an ASV microbenchmark for `SQL_GROUPED_MAP_PANDAS_UDF` in
`python/benchmarks/bench_eval_type.py`.
The benchmark simulates the full `worker.py` pipeline by constructing the
complete binary protocol that `main(infile, outfile)` expects.
Large groups (100k rows/group) are split into Arrow sub-batches of 10k rows
via `spark.sql.execution.arrow.maxRecordsPerBatch` (default), passed through
RunnerConf, mirroring the JVM-side behaviour.
### Why are the changes needed?
This is part of SPARK-55724 to add per-eval-type microbenchmarks for
PySpark UDF worker pipelines. These benchmarks help catch performance
regressions in the Python-side serialization/deserialization path (e.g.,
SPARK-55459 fixed a 3x regression in `applyInPandas`).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`asv run --python=same -a repeat='(5,10,10.0)'` on Apple M4 Max:
```
GroupedMapPandasUDFBench:
time_small_groups_few_cols (1k rows/group, 5 cols, 1500 groups):
884±3ms
peakmem_small_groups_few_cols :
1.96 G
time_small_groups_many_cols (1k rows/group, 50 cols, 200 groups):
741±5ms
peakmem_small_groups_many_cols :
1.99 G
time_large_groups_few_cols (100k rows/group, 5 cols, 350 groups):
786±60ms
peakmem_large_groups_few_cols :
3.19 G
time_large_groups_many_cols (100k rows/group,50 cols, 40 groups):
681±50ms
peakmem_large_groups_many_cols :
3.74 G
time_mixed_types (mixed cols, 1-arg UDF, 1300 groups):
884±3ms
peakmem_mixed_types :
1.90 G
time_mixed_types_two_args (mixed cols, 2-arg UDF, 1600 groups):
822±4ms
peakmem_mixed_types_two_args :
1.91 G
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54533 from
Yicong-Huang/SPARK-55726/benchmark/grouped-map-pandas-udf.
Lead-authored-by: Yicong-Huang
<[email protected]>
Co-authored-by: Yicong Huang
<[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
python/benchmarks/bench_eval_type.py | 348 +++++++++++++++++++++++++++++++++++
1 file changed, 348 insertions(+)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
new file mode 100644
index 000000000000..eaf5313a4cc8
--- /dev/null
+++ b/python/benchmarks/bench_eval_type.py
@@ -0,0 +1,348 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Microbenchmarks for PySpark UDF eval types.
+
+Each benchmark class simulates the full worker pipeline for one eval type
+by constructing the complete binary protocol that ``worker.py``'s
+``main(infile, outfile)`` expects.
+"""
+
+import io
+import json
+import struct
+import sys
+from typing import Any, Callable, Optional
+
+import numpy as np
+import pyarrow as pa
+
+from pyspark.cloudpickle import dumps as cloudpickle_dumps
+from pyspark.serializers import write_int, write_long
+from pyspark.sql.types import (
+ DoubleType,
+ IntegerType,
+ StringType,
+ StructField,
+ StructType,
+)
+from pyspark.util import PythonEvalType
+from pyspark.worker import main as worker_main
+
+
+# ---------------------------------------------------------------------------
+# Wire-format helpers
+# ---------------------------------------------------------------------------
+
+
+def _write_utf8(s: str, buf: io.BytesIO) -> None:
+ """Write a length-prefixed UTF-8 string (matches
``UTF8Deserializer.loads``)."""
+ encoded = s.encode("utf-8")
+ write_int(len(encoded), buf)
+ buf.write(encoded)
+
+
+def _write_bool(val: bool, buf: io.BytesIO) -> None:
+ buf.write(struct.pack("!?", val))
+
+
+# ---------------------------------------------------------------------------
+# Worker protocol builder
+# ---------------------------------------------------------------------------
+
+
+def _build_preamble(buf: io.BytesIO) -> None:
+ """Write everything ``main()`` reads before ``eval_type``."""
+ write_int(0, buf) # split_index
+ _write_utf8(f"{sys.version_info[0]}.{sys.version_info[1]}", buf) # python
version
+ _write_utf8(
+ json.dumps(
+ {
+ "isBarrier": False,
+ "stageId": 0,
+ "partitionId": 0,
+ "attemptNumber": 0,
+ "taskAttemptId": 0,
+ "cpus": 1,
+ "resources": {},
+ "localProperties": {},
+ }
+ ),
+ buf,
+ )
+ _write_utf8("/tmp", buf) # spark_files_dir
+ write_int(0, buf) # num_python_includes
+ _write_bool(False, buf) # needs_broadcast_decryption_server
+ write_int(0, buf) # num_broadcast_variables
+
+
+def _build_udf_payload(
+ udf_func: Callable[..., Any],
+ return_type: StructType,
+ arg_offsets: list[int],
+ buf: io.BytesIO,
+) -> None:
+ """Write the ``read_single_udf`` portion of the protocol."""
+ write_int(1, buf) # num_udfs
+ write_int(len(arg_offsets), buf) # num_arg
+ for offset in arg_offsets:
+ write_int(offset, buf)
+ _write_bool(False, buf) # is_kwarg
+ write_int(1, buf) # num_chained
+ command = cloudpickle_dumps((udf_func, return_type))
+ write_int(len(command), buf)
+ buf.write(command)
+ write_long(0, buf) # result_id
+
+
+def _build_grouped_arrow_data(
+ arrow_batch: pa.RecordBatch,
+ num_groups: int,
+ buf: io.BytesIO,
+ max_records_per_batch: Optional[int] = None,
+) -> None:
+ """Write grouped-map Arrow data: ``(write_int(1) + IPC) * N +
write_int(0)``.
+
+ When *max_records_per_batch* is set and the batch exceeds that limit, each
+ group is split into multiple smaller Arrow batches inside the same IPC
stream,
+ mirroring what the JVM does under
``spark.sql.execution.arrow.maxRecordsPerBatch``.
+ """
+ for _ in range(num_groups):
+ write_int(1, buf)
+ writer = pa.RecordBatchStreamWriter(buf, arrow_batch.schema)
+ if max_records_per_batch and arrow_batch.num_rows >
max_records_per_batch:
+ for offset in range(0, arrow_batch.num_rows,
max_records_per_batch):
+ writer.write_batch(arrow_batch.slice(offset,
max_records_per_batch))
+ else:
+ writer.write_batch(arrow_batch)
+ writer.close()
+ write_int(0, buf)
+
+
+def _build_worker_input(
+ eval_type: int,
+ udf_func: Callable[..., Any],
+ return_type: StructType,
+ arg_offsets: list[int],
+ arrow_batch: pa.RecordBatch,
+ num_groups: int,
+ max_records_per_batch: Optional[int] = None,
+) -> bytes:
+ """Assemble the full binary stream consumed by ``worker_main(infile,
outfile)``.
+
+ Parameters
+ ----------
+ max_records_per_batch : int, optional
+ When set, each group's Arrow data is split into sub-batches of this
+ size, mirroring the JVM-side behaviour of
+ ``spark.sql.execution.arrow.maxRecordsPerBatch``.
+ """
+ buf = io.BytesIO()
+
+ _build_preamble(buf)
+ write_int(eval_type, buf)
+
+ write_int(0, buf) # RunnerConf (0 key-value pairs)
+ write_int(0, buf) # EvalConf (0 key-value pairs)
+
+ _build_udf_payload(udf_func, return_type, arg_offsets, buf)
+ _build_grouped_arrow_data(arrow_batch, num_groups, buf,
max_records_per_batch)
+ write_int(-4, buf) # SpecialLengths.END_OF_STREAM
+
+ return buf.getvalue()
+
+
+# ---------------------------------------------------------------------------
+# Data helpers
+# ---------------------------------------------------------------------------
+
+
+def _build_grouped_arg_offsets(n_cols: int, n_keys: int = 0) -> list[int]:
+ """``[len, num_keys, key_col_0, …, val_col_0, …]``"""
+ keys = list(range(n_keys))
+ vals = list(range(n_keys, n_cols))
+ offsets = [n_keys] + keys + vals
+ return [len(offsets)] + offsets
+
+
+def _make_grouped_batch(rows_per_group: int, n_cols: int) ->
tuple[pa.RecordBatch, StructType]:
+ """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns."""
+ arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [
+ pa.array(np.random.rand(rows_per_group).astype(np.float32)) for _ in
range(n_cols - 1)
+ ]
+ fields = [StructField("group_key", IntegerType())] + [
+ StructField(f"some_field_{i}", DoubleType()) for i in range(n_cols - 1)
+ ]
+ return (
+ pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+ StructType(fields),
+ )
+
+
+def _make_mixed_batch(rows_per_group: int) -> tuple[pa.RecordBatch,
StructType]:
+ """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``."""
+ arrays = [
+ pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+ pa.array([f"s{j}" for j in range(rows_per_group)]),
+ pa.array(np.random.rand(rows_per_group).astype(np.float32)),
+ pa.array(np.random.rand(rows_per_group)),
+ pa.array(np.zeros(rows_per_group, dtype=np.int64)),
+ ]
+ fields = [
+ StructField("id", IntegerType()),
+ StructField("str_col", StringType()),
+ StructField("float_col", DoubleType()),
+ StructField("double_col", DoubleType()),
+ StructField("long_col", IntegerType()),
+ ]
+ return (
+ pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]),
+ StructType(fields),
+ )
+
+
+# ---------------------------------------------------------------------------
+# SQL_GROUPED_MAP_PANDAS_UDF
+# ---------------------------------------------------------------------------
+
+
+class GroupedMapPandasUDFBench:
+ """Full worker round-trip for ``SQL_GROUPED_MAP_PANDAS_UDF``.
+
+ Large groups (100k rows) are split into Arrow sub-batches of at most
+ ``_MAX_RECORDS_PER_BATCH`` rows, mirroring the JVM-side splitting
+ behaviour (``spark.sql.execution.arrow.maxRecordsPerBatch`` default 10
000).
+ Small groups (1k rows) are unaffected.
+ """
+
+ _MAX_RECORDS_PER_BATCH = 10_000 # matches
spark.sql.execution.arrow.maxRecordsPerBatch default
+
+ def setup(self):
+ eval_type = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
+
+ # ---- varying group size (float data, identity UDF) ----
+ for name, (rows_per_group, n_cols, num_groups) in {
+ "small_few": (1_000, 5, 1_500),
+ "small_many": (1_000, 50, 200),
+ "large_few": (100_000, 5, 350),
+ "large_many": (100_000, 50, 40),
+ }.items():
+ batch, schema = _make_grouped_batch(rows_per_group, n_cols)
+ setattr(
+ self,
+ f"_{name}_input",
+ _build_worker_input(
+ eval_type,
+ lambda df: df,
+ schema,
+ _build_grouped_arg_offsets(n_cols),
+ batch,
+ num_groups=num_groups,
+ max_records_per_batch=self._MAX_RECORDS_PER_BATCH,
+ ),
+ )
+
+ # ---- mixed types, 1-arg UDF ----
+ mixed_batch, mixed_schema = _make_mixed_batch(3)
+ n_mixed = len(mixed_schema.fields)
+
+ def double_col_add_mean(pdf):
+ return pdf.assign(double_col=pdf["double_col"] +
pdf["double_col"].mean())
+
+ self._mixed_input = _build_worker_input(
+ eval_type,
+ double_col_add_mean,
+ mixed_schema,
+ _build_grouped_arg_offsets(n_mixed),
+ mixed_batch,
+ num_groups=1_300,
+ )
+
+ # ---- mixed types, 2-arg UDF with key ----
+ two_arg_schema = StructType(
+ [StructField("group_key", IntegerType()),
StructField("double_col_mean", DoubleType())]
+ )
+
+ def double_col_key_add_mean(key, pdf):
+ import pandas as pd
+
+ return pd.DataFrame(
+ [{"group_key": key[0], "double_col_mean":
pdf["double_col"].mean()}]
+ )
+
+ self._two_args_input = _build_worker_input(
+ eval_type,
+ double_col_key_add_mean,
+ two_arg_schema,
+ _build_grouped_arg_offsets(n_mixed, n_keys=1),
+ mixed_batch,
+ num_groups=1_600,
+ )
+
+ # -- benchmarks ---------------------------------------------------------
+
+ def _run(self, input_bytes):
+ worker_main(io.BytesIO(input_bytes), io.BytesIO())
+
+ def time_small_groups_few_cols(self):
+ """1k rows/group, 5 cols, 1500 groups."""
+ self._run(self._small_few_input)
+
+ def peakmem_small_groups_few_cols(self):
+ """1k rows/group, 5 cols, 1500 groups."""
+ self._run(self._small_few_input)
+
+ def time_small_groups_many_cols(self):
+ """1k rows/group, 50 cols, 200 groups."""
+ self._run(self._small_many_input)
+
+ def peakmem_small_groups_many_cols(self):
+ """1k rows/group, 50 cols, 200 groups."""
+ self._run(self._small_many_input)
+
+ def time_large_groups_few_cols(self):
+ """100k rows/group, 5 cols, 350 groups, split at 10k rows/batch."""
+ self._run(self._large_few_input)
+
+ def peakmem_large_groups_few_cols(self):
+ """100k rows/group, 5 cols, 350 groups, split at 10k rows/batch."""
+ self._run(self._large_few_input)
+
+ def time_large_groups_many_cols(self):
+ """100k rows/group, 50 cols, 40 groups, split at 10k rows/batch."""
+ self._run(self._large_many_input)
+
+ def peakmem_large_groups_many_cols(self):
+ """100k rows/group, 50 cols, 40 groups, split at 10k rows/batch."""
+ self._run(self._large_many_input)
+
+ def time_mixed_types(self):
+ """Mixed column types, 1-arg UDF, 3 rows/group, 1300 groups."""
+ self._run(self._mixed_input)
+
+ def peakmem_mixed_types(self):
+ """Mixed column types, 1-arg UDF, 3 rows/group, 1300 groups."""
+ self._run(self._mixed_input)
+
+ def time_mixed_types_two_args(self):
+ """Mixed column types, 2-arg UDF with key, 3 rows/group, 1600
groups."""
+ self._run(self._two_args_input)
+
+ def peakmem_mixed_types_two_args(self):
+ """Mixed column types, 2-arg UDF with key, 3 rows/group, 1600
groups."""
+ self._run(self._two_args_input)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]