This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ac46ea6aa8ce [SPARK-55821][PYTHON] Enforce keyword-only arguments in 
serializer __init__ methods
ac46ea6aa8ce is described below

commit ac46ea6aa8ce480d042f242125e050ea9a7dcbbf
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Mar 5 17:37:20 2026 +0800

    [SPARK-55821][PYTHON] Enforce keyword-only arguments in serializer __init__ 
methods
    
    ### What changes were proposed in this pull request?
    
    Add `*` separator to enforce keyword-only arguments in all serializer 
`__init__` methods in `pyspark.sql.pandas.serializers`, and convert all call 
sites in `worker.py` from positional to keyword arguments.
    
    ### Why are the changes needed?
    
    As noted by zhengruifeng in 
https://github.com/apache/spark/pull/54568#discussion_r2875751193, serializer 
constructors accept too many positional arguments, making call sites 
error-prone and hard to read. Enforcing keyword-only arguments prevents 
positional mistakes and improves readability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54605 from 
Yicong-Huang/SPARK-55821/refactor/keyword-only-serializer-init.
    
    Lead-authored-by: Yicong Huang 
<[email protected]>
    Co-authored-by: Yicong-Huang 
<[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/pandas/serializers.py |  54 +++++++++-------
 python/pyspark/worker.py                 | 104 ++++++++++++++++---------------
 2 files changed, 85 insertions(+), 73 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 019a88472648..84f25a7bb9f5 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -267,7 +267,7 @@ class 
ArrowStreamArrowUDTFSerializer(ArrowStreamUDTFSerializer):
     Serializer for PyArrow-native UDTFs that work directly with PyArrow 
RecordBatches and Arrays.
     """
 
-    def __init__(self, table_arg_offsets=None):
+    def __init__(self, *, table_arg_offsets=None):
         super().__init__()
         self.table_arg_offsets = table_arg_offsets if table_arg_offsets else []
 
@@ -360,7 +360,7 @@ class 
ArrowStreamGroupUDFSerializer(ArrowStreamUDFSerializer):
         If True, reorder serialized columns by schema name.
     """
 
-    def __init__(self, assign_cols_by_name):
+    def __init__(self, *, assign_cols_by_name):
         super().__init__()
         self._assign_cols_by_name = assign_cols_by_name
 
@@ -427,6 +427,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         int_to_decimal_coercion_enabled: bool = False,
@@ -507,6 +508,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -522,16 +524,16 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
         is_udtf: bool = False,
     ):
         super().__init__(
-            timezone,
-            safecheck,
-            int_to_decimal_coercion_enabled,
-            prefers_large_types,
-            struct_in_pandas,
-            ndarray_as_list,
-            prefer_int_ext_dtype,
-            df_for_struct,
-            input_type,
-            arrow_cast,
+            timezone=timezone,
+            safecheck=safecheck,
+            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
+            prefers_large_types=prefers_large_types,
+            struct_in_pandas=struct_in_pandas,
+            ndarray_as_list=ndarray_as_list,
+            prefer_int_ext_dtype=prefer_int_ext_dtype,
+            df_for_struct=df_for_struct,
+            input_type=input_type,
+            arrow_cast=arrow_cast,
         )
         self._assign_cols_by_name = assign_cols_by_name
         self._ignore_unexpected_complex_type_values = 
ignore_unexpected_complex_type_values
@@ -594,6 +596,7 @@ class ArrowStreamArrowUDFSerializer(ArrowStreamSerializer):
 
     def __init__(
         self,
+        *,
         safecheck,
         arrow_cast,
     ):
@@ -655,6 +658,7 @@ class 
ArrowBatchUDFSerializer(ArrowStreamArrowUDFSerializer):
 
     def __init__(
         self,
+        *,
         safecheck: bool,
         input_type: StructType,
         int_to_decimal_coercion_enabled: bool,
@@ -751,6 +755,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         input_type,
@@ -818,6 +823,7 @@ class 
ArrowStreamAggArrowUDFSerializer(ArrowStreamArrowUDFSerializer):
 class ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -873,6 +879,7 @@ class 
ArrowStreamAggPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
 class GroupPandasUDFSerializer(ArrowStreamPandasUDFSerializer):
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -989,6 +996,7 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -1388,6 +1396,7 @@ class 
TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -1523,6 +1532,7 @@ class 
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
 
     def __init__(
         self,
+        *,
         timezone,
         safecheck,
         assign_cols_by_name,
@@ -1532,13 +1542,13 @@ class 
TransformWithStateInPandasInitStateSerializer(TransformWithStateInPandasSe
         int_to_decimal_coercion_enabled,
     ):
         super().__init__(
-            timezone,
-            safecheck,
-            assign_cols_by_name,
-            prefer_int_ext_dtype,
-            arrow_max_records_per_batch,
-            arrow_max_bytes_per_batch,
-            int_to_decimal_coercion_enabled,
+            timezone=timezone,
+            safecheck=safecheck,
+            assign_cols_by_name=assign_cols_by_name,
+            prefer_int_ext_dtype=prefer_int_ext_dtype,
+            arrow_max_records_per_batch=arrow_max_records_per_batch,
+            arrow_max_bytes_per_batch=arrow_max_bytes_per_batch,
+            int_to_decimal_coercion_enabled=int_to_decimal_coercion_enabled,
         )
         self.init_key_offsets = None
 
@@ -1678,7 +1688,7 @@ class 
TransformWithStateInPySparkRowSerializer(ArrowStreamUDFSerializer):
         Limit of the number of records that can be written to a single 
ArrowRecordBatch in memory.
     """
 
-    def __init__(self, arrow_max_records_per_batch):
+    def __init__(self, *, arrow_max_records_per_batch):
         super().__init__()
         self.arrow_max_records_per_batch = (
             arrow_max_records_per_batch if arrow_max_records_per_batch > 0 
else 2**31 - 1
@@ -1777,8 +1787,8 @@ class 
TransformWithStateInPySparkRowInitStateSerializer(TransformWithStateInPySp
     Same as input parameters in TransformWithStateInPySparkRowSerializer.
     """
 
-    def __init__(self, arrow_max_records_per_batch):
-        super().__init__(arrow_max_records_per_batch)
+    def __init__(self, *, arrow_max_records_per_batch):
+        
super().__init__(arrow_max_records_per_batch=arrow_max_records_per_batch)
         self.init_key_offsets = None
 
     def load_stream(self, stream):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 1892dcbf3bf6..84e275936fb8 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1493,8 +1493,8 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
         if runner_conf.use_legacy_pandas_udtf_conversion:
             # NOTE: if timezone is set here, that implies 
respectSessionTimeZone is True
             ser = ArrowStreamPandasUDTFSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
                 input_type=input_type,
                 prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
@@ -2685,7 +2685,7 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
             eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
             or eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF
         ):
-            ser = 
ArrowStreamGroupUDFSerializer(runner_conf.assign_cols_by_name)
+            ser = 
ArrowStreamGroupUDFSerializer(assign_cols_by_name=runner_conf.assign_cols_by_name)
         elif eval_type in (
             PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
             PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
@@ -2698,70 +2698,72 @@ def read_udfs(pickleSer, infile, eval_type, 
runner_conf, eval_conf):
             PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
         ):
             ser = ArrowStreamAggPandasUDFSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                runner_conf.prefer_int_ext_dtype,
-                runner_conf.int_to_decimal_coercion_enabled,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif (
             eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
             or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF
         ):
             ser = GroupPandasUDFSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                runner_conf.prefer_int_ext_dtype,
-                runner_conf.int_to_decimal_coercion_enabled,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
-            ser = CogroupArrowUDFSerializer(runner_conf.assign_cols_by_name)
+            ser = 
CogroupArrowUDFSerializer(assign_cols_by_name=runner_conf.assign_cols_by_name)
         elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
             ser = CogroupPandasUDFSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
                 prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
                 arrow_cast=True,
             )
         elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
             ser = ApplyInPandasWithStateSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                runner_conf.prefer_int_ext_dtype,
-                eval_conf.state_value_schema,
-                runner_conf.arrow_max_records_per_batch,
-                runner_conf.use_large_var_types,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                state_object_schema=eval_conf.state_value_schema,
+                
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+                prefers_large_var_types=runner_conf.use_large_var_types,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
             ser = TransformWithStateInPandasSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                runner_conf.prefer_int_ext_dtype,
-                runner_conf.arrow_max_records_per_batch,
-                runner_conf.arrow_max_bytes_per_batch,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+                
arrow_max_bytes_per_batch=runner_conf.arrow_max_bytes_per_batch,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_INIT_STATE_UDF:
             ser = TransformWithStateInPandasInitStateSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                runner_conf.prefer_int_ext_dtype,
-                runner_conf.arrow_max_records_per_batch,
-                runner_conf.arrow_max_bytes_per_batch,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch,
+                
arrow_max_bytes_per_batch=runner_conf.arrow_max_bytes_per_batch,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
             )
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_UDF:
-            ser = 
TransformWithStateInPySparkRowSerializer(runner_conf.arrow_max_records_per_batch)
+            ser = TransformWithStateInPySparkRowSerializer(
+                
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch
+            )
         elif eval_type == 
PythonEvalType.SQL_TRANSFORM_WITH_STATE_PYTHON_ROW_INIT_STATE_UDF:
             ser = TransformWithStateInPySparkRowInitStateSerializer(
-                runner_conf.arrow_max_records_per_batch
+                
arrow_max_records_per_batch=runner_conf.arrow_max_records_per_batch
             )
         elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
             ser = ArrowStreamSerializer(write_start_stream=True)
@@ -2777,10 +2779,10 @@ def read_udfs(pickleSer, infile, eval_type, 
runner_conf, eval_conf):
         ):
             input_type = 
_parse_datatype_json_string(utf8_deserializer.loads(infile))
             ser = ArrowBatchUDFSerializer(
-                runner_conf.safecheck,
-                input_type,
-                runner_conf.int_to_decimal_coercion_enabled,
-                runner_conf.binary_as_bytes,
+                safecheck=runner_conf.safecheck,
+                input_type=input_type,
+                
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
+                binary_as_bytes=runner_conf.binary_as_bytes,
             )
         else:
             # Scalar Pandas UDF handles struct type arguments as pandas 
DataFrames instead of
@@ -2803,15 +2805,15 @@ def read_udfs(pickleSer, infile, eval_type, 
runner_conf, eval_conf):
             )
 
             ser = ArrowStreamPandasUDFSerializer(
-                runner_conf.timezone,
-                runner_conf.safecheck,
-                runner_conf.assign_cols_by_name,
-                df_for_struct,
-                struct_in_pandas,
-                ndarray_as_list,
-                runner_conf.prefer_int_ext_dtype,
-                True,
-                input_type,
+                timezone=runner_conf.timezone,
+                safecheck=runner_conf.safecheck,
+                assign_cols_by_name=runner_conf.assign_cols_by_name,
+                df_for_struct=df_for_struct,
+                struct_in_pandas=struct_in_pandas,
+                ndarray_as_list=ndarray_as_list,
+                prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype,
+                arrow_cast=True,
+                input_type=input_type,
                 
int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled,
                 prefers_large_types=runner_conf.use_large_var_types,
             )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to