This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 70469a28fe99 [SPARK-55934][PYTHON][TEST][FOLLOWUP] Fix MAP_ARROW_ITER
bench UDF return type
70469a28fe99 is described below
commit 70469a28fe99d3f86900fbdf15ad375a25dc2dec
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed May 27 23:51:13 2026 -0700
[SPARK-55934][PYTHON][TEST][FOLLOWUP] Fix MAP_ARROW_ITER bench UDF return
type
### What changes were proposed in this pull request?
Fix the default `ret_type` resolution in
`_MapArrowIterBenchMixin._write_scenario` in
`python/benchmarks/bench_eval_type.py`. The three benchmark UDFs
(`identity_udf`, `sort_udf`, `filter_udf`) all return whole `pa.RecordBatch`es
with the input row schema, so their declared return type should be the inner
row `StructType`, not the first nested field's data type.
Before:
```python
ret_type = schema.fields[0].dataType.fields[0].dataType # first column's
type
```
After:
```python
ret_type = schema.fields[0].dataType # the row's StructType
```
### Why are the changes needed?
`mapInArrow` UDFs are contractually `(Iterator[pa.RecordBatch]) ->
Iterator[pa.RecordBatch]`, with the user-supplied `schema` describing the
output rows as a whole. The benchmark UDFs return full batches but were
declaring just one column's type, which is semantically inconsistent with the
API.
This did not surface as an error because `worker.py` discards `return_type`
for `SQL_MAP_ARROW_ITER_UDF` in `read_single_udf` (returns `None`) and only
checks `Iterator[pa.RecordBatch]` structurally via `verify_return_type`.
Schema-level validation is currently absent, so the mismatched type was
tolerated. If the worker ever adds schema validation for this eval type, the
previous declaration would break the benchmark.
### Does this PR introduce _any_ user-facing change?
No. Test-only change in the benchmark module.
### How was this patch tested?
- Confirmed the new default resolves to a 5-field `StructType` matching the
input row schema for `sm_batch_few_col`.
- Ran `MapArrowIterUDFTimeBench.setup` + `time_worker` for
`(sm_batch_few_col, pure_ints) x (identity_udf, sort_udf, filter_udf)`.
- Ran `MapArrowIterUDFPeakmemBench.setup` + `peakmem_worker` for
`sm_batch_few_col/identity_udf`.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude Code (claude-opus-4-7)
Closes #56168 from viirya/SPARK-55934-followup.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
python/benchmarks/bench_eval_type.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/python/benchmarks/bench_eval_type.py
b/python/benchmarks/bench_eval_type.py
index 131ced87dfc8..57754fc8a9f5 100644
--- a/python/benchmarks/bench_eval_type.py
+++ b/python/benchmarks/bench_eval_type.py
@@ -1457,7 +1457,10 @@ class _MapArrowIterBenchMixin:
batches, schema = self._build_scenario(scenario)
udf_func, ret_type, arg_offsets = self._udfs[udf_name]
if ret_type is None:
- ret_type = schema.fields[0].dataType.fields[0].dataType
+ # mapInArrow UDFs return an Iterator[pa.RecordBatch] with the same
+ # schema as the input row (the inner struct, since make_batches
+ # wraps the row schema in a single struct column for the wire).
+ ret_type = schema.fields[0].dataType
MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type,
arg_offsets, b),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]