This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 81540fa39b9f [SPARK-54722][PYTHON][SQL] Register Pandas Grouped Iter
Aggregate UDF for SQL usage
81540fa39b9f is described below
commit 81540fa39b9f7d5ebd2958c42479dde23ededa96
Author: Yicong-Huang <[email protected]>
AuthorDate: Thu Dec 18 10:24:14 2025 +0800
[SPARK-54722][PYTHON][SQL] Register Pandas Grouped Iter Aggregate UDF for
SQL usage
### What changes were proposed in this pull request?
This PR adds `SQL_GROUPED_AGG_PANDAS_ITER_UDF` to the list of supported
eval types in `UDFRegistration.register()` method, allowing users to register
Pandas Grouped Iter Aggregate UDFs for SQL usage.
### Why are the changes needed?
Currently, the iterator API for grouped aggregate Pandas UDFs cannot be
registered for SQL usage via `spark.udf.register()`. This is inconsistent with
other UDF types like `SQL_GROUPED_AGG_ARROW_ITER_UDF` which is already
supported.
With this change, users can now register iterator-based grouped aggregate
UDFs and use them in SQL queries:
```python
pandas_udf("double")
def sum_iter_udf(it: Iterator[pd.Series]) -> float:
total = 0.0
for series in it:
total += series.sum()
return total
spark.udf.register("sum_iter_udf", sum_iter_udf)
spark.sql("SELECT sum_iter_udf(v) FROM table GROUP BY id")
```
### Does this PR introduce _any_ user-facing change?
Yes. Users can now register Pandas Grouped Iter Aggregate UDFs
(`Iterator[pd.Series] -> scalar`) for SQL usage.
### How was this patch tested?
Added a new test case `test_register_grouped_agg_iter_udf` in
`python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53493 from
Yicong-Huang/SPARK-54722/feat/register-pandas-grouped-iter-agg-udf.
Authored-by: Yicong-Huang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/udf.py | 5 +++--
.../sql/tests/pandas/test_pandas_grouped_map.py | 4 ++--
.../tests/pandas/test_pandas_udf_grouped_agg.py | 26 ++++++++++++++++++++++
python/pyspark/sql/udf.py | 5 +++--
4 files changed, 34 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/sql/connect/udf.py
b/python/pyspark/sql/connect/udf.py
index a5257ac9d09e..3d61471e8251 100644
--- a/python/pyspark/sql/connect/udf.py
+++ b/python/pyspark/sql/connect/udf.py
@@ -295,6 +295,7 @@ class UDFRegistration:
PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
]:
raise PySparkTypeError(
@@ -303,8 +304,8 @@ class UDFRegistration:
"eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
"SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF,
SQL_SCALAR_ARROW_ITER_UDF, "
- "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF
or "
- "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+ "SQL_GROUPED_AGG_PANDAS_UDF,
SQL_GROUPED_AGG_ARROW_UDF, "
+ "SQL_GROUPED_AGG_PANDAS_ITER_UDF or
SQL_GROUPED_AGG_ARROW_ITER_UDF"
},
)
self.sparkSession._client.register_udf(
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 2f9e4bbd8ddd..235b14a9aa72 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -212,8 +212,8 @@ class ApplyInPandasTestsMixin:
"eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
"SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
- "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF or "
- "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+ "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF, "
+ "SQL_GROUPED_AGG_PANDAS_ITER_UDF or
SQL_GROUPED_AGG_ARROW_ITER_UDF"
},
)
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index c71e78609f82..36fe3cfeb8a7 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -542,6 +542,32 @@ class GroupedAggPandasUDFTestsMixin:
expected = [1, 5]
self.assertEqual(actual, expected)
+ def test_register_grouped_agg_iter_udf(self):
+ """Test registering a grouped aggregate iterator UDF for SQL usage."""
+
+ @pandas_udf("integer")
+ def sum_iter_udf(it: Iterator[pd.Series]) -> int:
+ total = 0
+ for series in it:
+ total += series.sum()
+ return total
+
+ self.assertEqual(sum_iter_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF)
+
+ with self.temp_func("sum_iter_udf"):
+ registered_udf = self.spark.udf.register("sum_iter_udf",
sum_iter_udf)
+ self.assertEqual(
+ registered_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF
+ )
+
+ q = """
+ SELECT sum_iter_udf(v1)
+ FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2
+ """
+ actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
+ expected = [1, 5]
+ self.assertEqual(actual, expected)
+
def test_grouped_with_empty_partition(self):
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
expected = [Row(id=1, sum=5), Row(id=2, x=4)]
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 14d3f92f053d..34c69d1ffda2 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -681,6 +681,7 @@ class UDFRegistration:
PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
]:
raise PySparkTypeError(
@@ -689,8 +690,8 @@ class UDFRegistration:
"eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
"SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF,
SQL_SCALAR_ARROW_ITER_UDF, "
- "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF
or "
- "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+ "SQL_GROUPED_AGG_PANDAS_UDF,
SQL_GROUPED_AGG_ARROW_UDF, "
+ "SQL_GROUPED_AGG_PANDAS_ITER_UDF or
SQL_GROUPED_AGG_ARROW_ITER_UDF"
},
)
source_udf = _create_udf(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]