This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8760286a679 [SPARK-54922][PYTHON] Unify how args are passed to python 
workers
f8760286a679 is described below

commit f8760286a67973857ec9f1e433b31baf612aa1c9
Author: Tian Gao <[email protected]>
AuthorDate: Wed Jan 7 10:03:28 2026 +0900

    [SPARK-54922][PYTHON] Unify how args are passed to python workers
    
    ### What changes were proposed in this pull request?
    
    Unify two ways of passing arguments to python workers.
    
    ### Why are the changes needed?
    
    We are cleaning up executor <-> worker protocols so we can put all the 
protocol related code together - which is helpful for future improvement like 
protocol check. Having two ways to do one thing is not ideal.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53699 from gaogaotiantian/unify-arg-reading.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/worker.py                           | 36 ++++++----------------
 .../sql/execution/python/PythonUDFRunner.scala     | 21 +------------
 2 files changed, 10 insertions(+), 47 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 386efadce58e..4b2be1bc8587 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1430,33 +1430,15 @@ def wrap_memory_profiler(f, eval_type, result_id):
 def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, 
profiler):
     num_arg = read_int(infile)
 
-    if eval_type in (
-        PythonEvalType.SQL_BATCHED_UDF,
-        PythonEvalType.SQL_ARROW_BATCHED_UDF,
-        PythonEvalType.SQL_SCALAR_PANDAS_UDF,
-        PythonEvalType.SQL_SCALAR_ARROW_UDF,
-        PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
-        PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
-        PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
-        # The below doesn't support named argument, but shares the same 
protocol.
-        PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
-        PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
-        PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
-        PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
-        PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
-    ):
-        args_offsets = []
-        kwargs_offsets = {}
-        for _ in range(num_arg):
-            offset = read_int(infile)
-            if read_bool(infile):
-                name = utf8_deserializer.loads(infile)
-                kwargs_offsets[name] = offset
-            else:
-                args_offsets.append(offset)
-    else:
-        args_offsets = [read_int(infile) for i in range(num_arg)]
-        kwargs_offsets = {}
+    args_offsets = []
+    kwargs_offsets = {}
+    for _ in range(num_arg):
+        offset = read_int(infile)
+        if read_bool(infile):
+            name = utf8_deserializer.loads(infile)
+            kwargs_offsets[name] = offset
+        else:
+            args_offsets.append(offset)
 
     chained_func = None
     for i in range(read_int(infile)):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index aae87bd94834..c728102962cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -173,26 +173,7 @@ object PythonUDFRunner {
       funcs: Seq[(ChainedPythonFunctions, Long)],
       argOffsets: Array[Array[Int]],
       profiler: Option[String]): Unit = {
-    profiler match {
-      case Some(p) =>
-        dataOut.writeBoolean(true)
-        PythonWorkerUtils.writeUTF(p, dataOut)
-      case _ => dataOut.writeBoolean(false)
-    }
-    dataOut.writeInt(funcs.length)
-    funcs.zip(argOffsets).foreach { case ((chained, resultId), offsets) =>
-      dataOut.writeInt(offsets.length)
-      offsets.foreach { offset =>
-        dataOut.writeInt(offset)
-      }
-      dataOut.writeInt(chained.funcs.length)
-      chained.funcs.foreach { f =>
-        PythonWorkerUtils.writePythonFunction(f, dataOut)
-      }
-      if (profiler.isDefined) {
-        dataOut.writeLong(resultId)
-      }
-    }
+    writeUDFs(dataOut, funcs, argOffsets.map(_.map(ArgumentMetadata(_, None, 
false))), profiler)
   }
 
   def writeUDFs(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to