This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81540fa39b9f [SPARK-54722][PYTHON][SQL] Register Pandas Grouped Iter 
Aggregate UDF for SQL usage
81540fa39b9f is described below

commit 81540fa39b9f7d5ebd2958c42479dde23ededa96
Author: Yicong-Huang <[email protected]>
AuthorDate: Thu Dec 18 10:24:14 2025 +0800

    [SPARK-54722][PYTHON][SQL] Register Pandas Grouped Iter Aggregate UDF for 
SQL usage
    
    ### What changes were proposed in this pull request?
    
    This PR adds `SQL_GROUPED_AGG_PANDAS_ITER_UDF` to the list of supported 
eval types in `UDFRegistration.register()` method, allowing users to register 
Pandas Grouped Iter Aggregate UDFs for SQL usage.
    
    ### Why are the changes needed?
    
    Currently, the iterator API for grouped aggregate Pandas UDFs cannot be 
registered for SQL usage via `spark.udf.register()`. This is inconsistent with 
other UDF types like `SQL_GROUPED_AGG_ARROW_ITER_UDF` which is already 
supported.
    
    With this change, users can now register iterator-based grouped aggregate 
UDFs and use them in SQL queries:
    
    ```python
    pandas_udf("double")
    def sum_iter_udf(it: Iterator[pd.Series]) -> float:
        total = 0.0
        for series in it:
            total += series.sum()
        return total
    
    spark.udf.register("sum_iter_udf", sum_iter_udf)
    spark.sql("SELECT sum_iter_udf(v) FROM table GROUP BY id")
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users can now register Pandas Grouped Iter Aggregate UDFs 
(`Iterator[pd.Series] -> scalar`) for SQL usage.
    
    ### How was this patch tested?
    
    Added a new test case `test_register_grouped_agg_iter_udf` in 
`python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53493 from 
Yicong-Huang/SPARK-54722/feat/register-pandas-grouped-iter-agg-udf.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/udf.py                  |  5 +++--
 .../sql/tests/pandas/test_pandas_grouped_map.py    |  4 ++--
 .../tests/pandas/test_pandas_udf_grouped_agg.py    | 26 ++++++++++++++++++++++
 python/pyspark/sql/udf.py                          |  5 +++--
 4 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/connect/udf.py 
b/python/pyspark/sql/connect/udf.py
index a5257ac9d09e..3d61471e8251 100644
--- a/python/pyspark/sql/connect/udf.py
+++ b/python/pyspark/sql/connect/udf.py
@@ -295,6 +295,7 @@ class UDFRegistration:
                 PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+                PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
             ]:
                 raise PySparkTypeError(
@@ -303,8 +304,8 @@ class UDFRegistration:
                         "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
                         "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
                         "SQL_SCALAR_PANDAS_ITER_UDF, 
SQL_SCALAR_ARROW_ITER_UDF, "
-                        "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF 
or "
-                        "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+                        "SQL_GROUPED_AGG_PANDAS_UDF, 
SQL_GROUPED_AGG_ARROW_UDF, "
+                        "SQL_GROUPED_AGG_PANDAS_ITER_UDF or 
SQL_GROUPED_AGG_ARROW_ITER_UDF"
                     },
                 )
             self.sparkSession._client.register_udf(
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 2f9e4bbd8ddd..235b14a9aa72 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -212,8 +212,8 @@ class ApplyInPandasTestsMixin:
                     "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
                     "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
                     "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
-                    "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF or "
-                    "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+                    "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF, "
+                    "SQL_GROUPED_AGG_PANDAS_ITER_UDF or 
SQL_GROUPED_AGG_ARROW_ITER_UDF"
                 },
             )
 
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index c71e78609f82..36fe3cfeb8a7 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -542,6 +542,32 @@ class GroupedAggPandasUDFTestsMixin:
             expected = [1, 5]
             self.assertEqual(actual, expected)
 
+    def test_register_grouped_agg_iter_udf(self):
+        """Test registering a grouped aggregate iterator UDF for SQL usage."""
+
+        @pandas_udf("integer")
+        def sum_iter_udf(it: Iterator[pd.Series]) -> int:
+            total = 0
+            for series in it:
+                total += series.sum()
+            return total
+
+        self.assertEqual(sum_iter_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF)
+
+        with self.temp_func("sum_iter_udf"):
+            registered_udf = self.spark.udf.register("sum_iter_udf", 
sum_iter_udf)
+            self.assertEqual(
+                registered_udf.evalType, 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF
+            )
+
+            q = """
+                SELECT sum_iter_udf(v1)
+                FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2
+                """
+            actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
+            expected = [1, 5]
+            self.assertEqual(actual, expected)
+
     def test_grouped_with_empty_partition(self):
         data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
         expected = [Row(id=1, sum=5), Row(id=2, x=4)]
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 14d3f92f053d..34c69d1ffda2 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -681,6 +681,7 @@ class UDFRegistration:
                 PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
+                PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
                 PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
             ]:
                 raise PySparkTypeError(
@@ -689,8 +690,8 @@ class UDFRegistration:
                         "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
                         "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
                         "SQL_SCALAR_PANDAS_ITER_UDF, 
SQL_SCALAR_ARROW_ITER_UDF, "
-                        "SQL_GROUPED_AGG_PANDAS_UDF, SQL_GROUPED_AGG_ARROW_UDF 
or "
-                        "SQL_GROUPED_AGG_ARROW_ITER_UDF"
+                        "SQL_GROUPED_AGG_PANDAS_UDF, 
SQL_GROUPED_AGG_ARROW_UDF, "
+                        "SQL_GROUPED_AGG_PANDAS_ITER_UDF or 
SQL_GROUPED_AGG_ARROW_ITER_UDF"
                     },
                 )
             source_udf = _create_udf(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to