This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6695a6911e78 [SPARK-54701][PYTHON] Improve the runnerConf chain for 
Python workers
6695a6911e78 is described below

commit 6695a6911e7810dee7ffa2fca23aa6f83f6bdd7b
Author: Tian Gao <[email protected]>
AuthorDate: Fri Jan 2 09:05:18 2026 +0900

    [SPARK-54701][PYTHON] Improve the runnerConf chain for Python workers
    
    ### What changes were proposed in this pull request?
    
    `runnerConf` now honors the parent `runnerConf`. It inherits the 
`runnerConf` instead of overwrite it.
    
    ### Why are the changes needed?
    
    To make it flexible for any class in the chain to add some extra configs 
for the runner.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53462 from gaogaotiantian/improve-runnerConf-inherot.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +-
 .../org/apache/spark/sql/execution/python/ArrowPythonRunner.scala | 8 ++++++--
 .../apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala | 4 +++-
 .../spark/sql/execution/python/CoGroupedArrowPythonRunner.scala   | 4 +++-
 .../python/streaming/ApplyInPandasWithStatePythonRunner.scala     | 8 +++++---
 .../streaming/TransformWithStateInPySparkPythonRunner.scala       | 8 +++++---
 6 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index ccc61986d176..878f27961d5d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -214,7 +214,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   protected val hideTraceback: Boolean = false
   protected val simplifiedTraceback: Boolean = false
 
-  protected val runnerConf: Map[String, String] = Map.empty
+  protected def runnerConf: Map[String, String] = Map.empty
 
   // All the Python functions should have the same exec, version and envvars.
   protected val envVars: java.util.Map[String, String] = 
funcs.head.funcs.head.envVars
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 499fa99a2444..a644a45bc59d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -104,7 +104,7 @@ class ArrowPythonRunner(
     _schema: StructType,
     _timeZoneId: String,
     largeVarTypes: Boolean,
-    protected override val runnerConf: Map[String, String],
+    pythonRunnerConf: Map[String, String],
     pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
     sessionUUID: Option[String],
@@ -113,6 +113,8 @@ class ArrowPythonRunner(
     funcs, evalType, argOffsets, _schema, _timeZoneId, largeVarTypes,
     pythonMetrics, jobArtifactUUID, sessionUUID) {
 
+  override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
+
   override protected def writeUDF(dataOut: DataOutputStream): Unit =
     PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets, profiler)
 }
@@ -128,7 +130,7 @@ class ArrowPythonWithNamedArgumentRunner(
     _schema: StructType,
     _timeZoneId: String,
     largeVarTypes: Boolean,
-    protected override val runnerConf: Map[String, String],
+    pythonRunnerConf: Map[String, String],
     pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
     sessionUUID: Option[String],
@@ -137,6 +139,8 @@ class ArrowPythonWithNamedArgumentRunner(
     funcs, evalType, argMetas.map(_.map(_.offset)), _schema, _timeZoneId, 
largeVarTypes,
     pythonMetrics, jobArtifactUUID, sessionUUID) {
 
+  override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
+
   override protected def writeUDF(dataOut: DataOutputStream): Unit = {
     if (evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF) {
       PythonWorkerUtils.writeUTF(schema.json, dataOut)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index 979d91205d5a..e5c7be2f4070 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -39,7 +39,7 @@ class ArrowPythonUDTFRunner(
     protected override val schema: StructType,
     protected override val timeZoneId: String,
     protected override val largeVarTypes: Boolean,
-    protected override val runnerConf: Map[String, String],
+    pythonRunnerConf: Map[String, String],
     override val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
     sessionUUID: Option[String])
@@ -49,6 +49,8 @@ class ArrowPythonUDTFRunner(
   with BatchedPythonArrowInput
   with BasicPythonArrowOutput {
 
+  override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
+
   override protected def writeUDF(dataOut: DataOutputStream): Unit = {
     // For arrow-optimized Python UDTFs (@udtf(useArrow=True)), we need to 
write
     // the schema to the worker to support UDT (user-defined type).
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index b5986be9214a..e99fc1560e8e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -45,7 +45,7 @@ class CoGroupedArrowPythonRunner(
     rightSchema: StructType,
     timeZoneId: String,
     largeVarTypes: Boolean,
-    protected override val runnerConf: Map[String, String],
+    pythonRunnerConf: Map[String, String],
     override val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
     sessionUUID: Option[String],
@@ -55,6 +55,8 @@ class CoGroupedArrowPythonRunner(
     funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with BasicPythonArrowOutput {
 
+  override protected def runnerConf: Map[String, String] = super.runnerConf ++ 
pythonRunnerConf
+
   override val envVars: util.Map[String, String] = {
     val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars)
     sessionUUID.foreach { uuid =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index ae89ff1637ed..a4909535881d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -113,9 +113,11 @@ class ApplyInPandasWithStatePythonRunner(
   // applyInPandasWithState has its own mechanism to construct the Arrow 
RecordBatch instance.
   // Configurations are both applied to executor and Python worker, set them 
to the worker conf
   // to let Python worker read the config properly.
-  override protected val runnerConf: Map[String, String] = initialRunnerConf +
-    (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString) +
-    (SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key -> 
arrowMaxBytesPerBatch.toString)
+  override protected def runnerConf: Map[String, String] =
+    super.runnerConf ++ initialRunnerConf ++ Map(
+      SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString,
+      SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key -> 
arrowMaxBytesPerBatch.toString
+    )
 
   private val stateRowDeserializer = stateEncoder.createDeserializer()
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
index bbf7b9387526..2ea3889be053 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
@@ -238,9 +238,11 @@ abstract class 
TransformWithStateInPySparkPythonBaseRunner[I](
   protected val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch
   protected val arrowMaxBytesPerBatch = sqlConf.arrowMaxBytesPerBatch
 
-  override protected val runnerConf: Map[String, String] = initialRunnerConf +
-    (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString) +
-    (SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key -> 
arrowMaxBytesPerBatch.toString)
+  override protected def runnerConf: Map[String, String] =
+    super.runnerConf ++ initialRunnerConf ++ Map(
+      SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key -> 
arrowMaxRecordsPerBatch.toString,
+      SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key -> 
arrowMaxBytesPerBatch.toString
+    )
 
   // Use lazy val to initialize the fields before these are accessed in 
[[PythonArrowInput]]'s
   // constructor.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to