This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 88846d73a13b [SPARK-55899][PYTHON][TEST] Add ASV microbenchmark for
SQL_ARROW_BATCHED_UDF
88846d73a13b is described below
commit 88846d73a13bedc56443405604787f62cea25ba5
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Mar 10 10:43:32 2026 +0800
[SPARK-55899][PYTHON][TEST] Add ASV microbenchmark for SQL_ARROW_BATCHED_UDF
### What changes were proposed in this pull request?
Add ASV microbenchmarks for `SQL_ARROW_BATCHED_UDF`.
### Why are the changes needed?
Part of SPARK-55724. Establishes baseline performance metrics for
`SQL_ARROW_BATCHED_UDF` before future refactoring work.
### Does this PR introduce _any_ user-facing change?
No. Benchmark files only.
### How was this patch tested?
`COLUMNS=120 asv run --python=same --bench "ArrowBatched" --attribute
"repeat=(3,5,5.0)"`:
**ArrowBatchedUDFTimeBench** (`SQL_ARROW_BATCHED_UDF`):
```
=================== ============== =============== ===============
-- udf
------------------- ----------------------------------------------
scenario identity_udf stringify_udf nullcheck_udf
=================== ============== =============== ===============
sm_batch_few_col 62.1+-0.2ms 66.1+-0.8ms 61.2+-0.1ms
sm_batch_many_col 154+-0.4ms 155+-0.4ms 154+-0.3ms
lg_batch_few_col 148+-0.3ms 157+-0.4ms 147+-0.5ms
lg_batch_many_col 623+-2ms 624+-2ms 620+-3ms
pure_ints 220+-0.5ms 231+-0.7ms 220+-6ms
pure_floats 224+-0.8ms 262+-1ms 225+-0.7ms
pure_strings 414+-1ms 415+-0.6ms 404+-1ms
mixed_types 311+-1ms 318+-0.8ms 308+-0.7ms
=================== ============== =============== ===============
```
**ArrowBatchedUDFPeakmemBench** (`SQL_ARROW_BATCHED_UDF`):
```
=================== ============== =============== ===============
-- udf
------------------- ----------------------------------------------
scenario identity_udf stringify_udf nullcheck_udf
=================== ============== =============== ===============
sm_batch_few_col 119M 119M 118M
sm_batch_many_col 123M 123M 123M
lg_batch_few_col 124M 124M 122M
lg_batch_many_col 159M 160M 159M
pure_ints 122M 123M 122M
pure_floats 124M 125M 123M
pure_strings 125M 125M 124M
mixed_types 123M 124M 123M
=================== ============== =============== ===============
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54702 from Yicong-Huang/SPARK-55724/bench/arrow-batch-udf.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/benchmarks/bench_eval_type.py | 109 +++++++++++++++++++++++++++++++++++
1 file changed, 109 insertions(+)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 53a4f208e99d..920b64a53e2a 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -329,6 +329,115 @@ class _NonGroupedBenchMixin:
)
+# -- SQL_ARROW_BATCHED_UDF --------------------------------------------------
+# Arrow-optimized Python UDF: receives individual Python values per row,
+# returns a single Python value. The wire protocol includes an extra
+# ``input_type`` (StructType JSON) before the UDF payload.
+
+
+def _build_arrow_batched_scenarios():
+ """Build scenarios for SQL_ARROW_BATCHED_UDF.
+
+ Returns a dict mapping scenario name to
+ ``(batch, num_batches, input_struct_type, col0_type)``.
+ ``input_struct_type`` is a StructType matching the batch schema,
+ needed for the wire protocol.
+ """
+ scenarios = {}
+
+ # Row-by-row processing is ~100x slower than columnar Arrow UDFs,
+ # so batch counts are much smaller to keep benchmarks under 60s.
+ for name, (rows, n_cols, num_batches) in {
+ "sm_batch_few_col": (1_000, 5, 20),
+ "sm_batch_many_col": (1_000, 50, 5),
+ "lg_batch_few_col": (10_000, 5, 5),
+ "lg_batch_many_col": (10_000, 50, 2),
+ }.items():
+ batch, col0_type = _make_typed_batch(rows, n_cols)
+ type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()]
+ input_struct = StructType(
+ [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in
range(n_cols)]
+ )
+ scenarios[name] = (batch, num_batches, input_struct, col0_type)
+
+ _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 10
+
+ for scenario_name, make_array, spark_type in [
+ (
+ "pure_ints",
+ lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)),
+ IntegerType(),
+ ),
+ ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()),
+ ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]),
StringType()),
+ ]:
+ batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array,
spark_type)
+ input_struct = StructType([StructField(f"col_{i}", spark_type) for i
in range(_PURE_COLS)])
+ scenarios[scenario_name] = (batch, _PURE_BATCHES, input_struct,
spark_type)
+
+ # mixed types
+ batch, col0_type = _make_typed_batch(_PURE_ROWS, _PURE_COLS)
+ type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()]
+ input_struct = StructType(
+ [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in
range(_PURE_COLS)]
+ )
+ scenarios["mixed_types"] = (batch, _PURE_BATCHES, input_struct, col0_type)
+
+ return scenarios
+
+
+_ARROW_BATCHED_SCENARIOS = _build_arrow_batched_scenarios()
+
+
+# UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values.
+# arg_offsets=[0] means the UDF receives column 0 value per row.
+_ARROW_BATCHED_UDFS = {
+ "identity_udf": (lambda x: x, None, [0]),
+ "stringify_udf": (lambda x: str(x), StringType(), [0]),
+ "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]),
+}
+
+
+class _ArrowBatchedBenchMixin:
+ """Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF.
+
+ Like ``_NonGroupedBenchMixin`` but writes the extra ``input_type``
+ (StructType JSON) that the wire protocol requires.
+ """
+
+ def _write_scenario(self, scenario, udf_name, buf):
+ batch, num_batches, input_struct, col0_type = self._scenarios[scenario]
+ udf_func, ret_type, arg_offsets = self._udfs[udf_name]
+ if ret_type is None:
+ ret_type = col0_type
+
+ def write_command(b):
+ # input_type is read before UDF payloads for ARROW_BATCHED_UDF
+ _write_utf8(input_struct.json(), b)
+ _build_udf_payload(udf_func, ret_type, arg_offsets, b)
+
+ _write_worker_input(
+ PythonEvalType.SQL_ARROW_BATCHED_UDF,
+ write_command,
+ lambda b: _write_arrow_ipc_batches((batch for _ in
range(num_batches)), b),
+ buf,
+ )
+
+
+class ArrowBatchedUDFTimeBench(_ArrowBatchedBenchMixin, _TimeBenchBase):
+ _scenarios = _ARROW_BATCHED_SCENARIOS
+ _udfs = _ARROW_BATCHED_UDFS
+ params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
+class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase):
+ _scenarios = _ARROW_BATCHED_SCENARIOS
+ _udfs = _ARROW_BATCHED_UDFS
+ params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)]
+ param_names = ["scenario", "udf"]
+
+
# -- SQL_SCALAR_ARROW_UDF ---------------------------------------------------
# UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``.
# All UDFs operate on arg_offsets=[0] so they work with any column type.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]