This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 30540df11bb9 [SPARK-51814][SS][PYTHON][FOLLOWUP] Sync missing Python 
function types
30540df11bb9 is described below

commit 30540df11bb960e58d8be0409fb7080e474c2bf7
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Sun Apr 27 16:03:14 2025 +0900

    [SPARK-51814][SS][PYTHON][FOLLOWUP] Sync missing Python function types
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to sync the missing python function types which are out of 
sync between Scala and Python.
    
    ### Why are the changes needed?
    
    These types are supposed to be sync between Scala and Python.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50728 from HeartSaVioR/SPARK-51814-follow-up-sync-function-type.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/functions.py |  8 ++++----
 python/pyspark/sql/pandas/group_ops.py |  4 ++--
 python/pyspark/util.py                 |  4 ++--
 python/pyspark/worker.py               | 20 ++++++++++----------
 4 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/sql/pandas/functions.py 
b/python/pyspark/sql/pandas/functions.py
index 9351dbdfd25b..be8ffacfa3d7 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -415,8 +415,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
         PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
         PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
         PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
         None,
@@ -459,8 +459,8 @@ def _validate_pandas_udf(f, evalType) -> int:
         PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
         PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF,
         PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
         PythonEvalType.SQL_ARROW_BATCHED_UDF,
diff --git a/python/pyspark/sql/pandas/group_ops.py 
b/python/pyspark/sql/pandas/group_ops.py
index 5b2ee83beec0..5fe711f742ce 100644
--- a/python/pyspark/sql/pandas/group_ops.py
+++ b/python/pyspark/sql/pandas/group_ops.py
@@ -654,10 +654,10 @@ class PandasGroupedOpsMixin:
         elif usePandas and initialState is not None:
             functionType = 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF
         elif not usePandas and initialState is None:
-            functionType = PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF
+            functionType = 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF
         else:
             # not usePandas and initialState is not None
-            functionType = 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF
+            functionType = 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF
 
         if initialState is None:
             initial_state_java_obj = None
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index 605f4f070b5a..7e07d95538e4 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -641,8 +641,8 @@ class PythonEvalType:
     SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF: 
"PandasGroupedMapUDFTransformWithStateInitStateType" = (  # noqa: E501
         212
     )
-    SQL_TRANSFORM_WITH_STATE_UDF: "GroupedMapUDFTransformWithStateType" = 213
-    SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF: 
"GroupedMapUDFTransformWithStateInitStateType" = (  # noqa: E501
+    SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF: 
"GroupedMapUDFTransformWithStateType" = 213
+    SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF: 
"GroupedMapUDFTransformWithStateInitStateType" = (  # noqa: E501
         214
     )
     SQL_TABLE_UDF: "SQLTableUDFType" = 300
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 61c006b1a9cf..5f4408851c67 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -969,9 +969,9 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index, profil
         return args_offsets, 
wrap_grouped_transform_with_state_pandas_init_state_udf(
             func, return_type, runner_conf
         )
-    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
+    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
         return args_offsets, wrap_grouped_transform_with_state_udf(func, 
return_type, runner_conf)
-    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
+    elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
         return args_offsets, wrap_grouped_transform_with_state_init_state_udf(
             func, return_type, runner_conf
         )
@@ -1572,8 +1572,8 @@ def read_udfs(pickleSer, infile, eval_type):
         PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF,
         PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF,
-        PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF,
+        PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF,
     ):
         # Load conf used for pandas_udf evaluation
         num_conf = read_int(infile)
@@ -1588,8 +1588,8 @@ def read_udfs(pickleSer, infile, eval_type):
         elif (
             eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF
             or eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF
-            or eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF
-            or eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF
+            or eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF
+            or eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF
         ):
             state_server_port = read_int(infile)
             if state_server_port == -1:
@@ -1641,14 +1641,14 @@ def read_udfs(pickleSer, infile, eval_type):
             ser = TransformWithStateInPandasInitStateSerializer(
                 timezone, safecheck, _assign_cols_by_name, 
arrow_max_records_per_batch
             )
-        elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
+        elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
             arrow_max_records_per_batch = runner_conf.get(
                 "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
             )
             arrow_max_records_per_batch = int(arrow_max_records_per_batch)
 
             ser = 
TransformWithStateInPySparkRowSerializer(arrow_max_records_per_batch)
-        elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
+        elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
             arrow_max_records_per_batch = runner_conf.get(
                 "spark.sql.execution.arrow.maxRecordsPerBatch", 10000
             )
@@ -1889,7 +1889,7 @@ def read_udfs(pickleSer, infile, eval_type):
                 # mode == PROCESS_TIMER or mode == COMPLETE
                 return f(stateful_processor_api_client, mode, None, iter([]))
 
-    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_UDF:
+    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
         # We assume there is only one UDF here because grouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
@@ -1916,7 +1916,7 @@ def read_udfs(pickleSer, infile, eval_type):
                 # mode == PROCESS_TIMER or mode == COMPLETE
                 return f(stateful_processor_api_client, mode, None, iter([]))
 
-    elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_INIT_STATE_UDF:
+    elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
         # We assume there is only one UDF here because grouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to