This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6695a6911e78 [SPARK-54701][PYTHON] Improve the runnerConf chain for
Python workers
6695a6911e78 is described below
commit 6695a6911e7810dee7ffa2fca23aa6f83f6bdd7b
Author: Tian Gao <[email protected]>
AuthorDate: Fri Jan 2 09:05:18 2026 +0900
[SPARK-54701][PYTHON] Improve the runnerConf chain for Python workers
### What changes were proposed in this pull request?
`runnerConf` now honors the parent `runnerConf`. It inherits the
`runnerConf` instead of overwrite it.
### Why are the changes needed?
To make it flexible for any class in the chain to add some extra configs
for the runner.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53462 from gaogaotiantian/improve-runnerConf-inherot.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +-
.../org/apache/spark/sql/execution/python/ArrowPythonRunner.scala | 8 ++++++--
.../apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala | 4 +++-
.../spark/sql/execution/python/CoGroupedArrowPythonRunner.scala | 4 +++-
.../python/streaming/ApplyInPandasWithStatePythonRunner.scala | 8 +++++---
.../streaming/TransformWithStateInPySparkPythonRunner.scala | 8 +++++---
6 files changed, 23 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index ccc61986d176..878f27961d5d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -214,7 +214,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val hideTraceback: Boolean = false
protected val simplifiedTraceback: Boolean = false
- protected val runnerConf: Map[String, String] = Map.empty
+ protected def runnerConf: Map[String, String] = Map.empty
// All the Python functions should have the same exec, version and envvars.
protected val envVars: java.util.Map[String, String] =
funcs.head.funcs.head.envVars
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 499fa99a2444..a644a45bc59d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -104,7 +104,7 @@ class ArrowPythonRunner(
_schema: StructType,
_timeZoneId: String,
largeVarTypes: Boolean,
- protected override val runnerConf: Map[String, String],
+ pythonRunnerConf: Map[String, String],
pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
sessionUUID: Option[String],
@@ -113,6 +113,8 @@ class ArrowPythonRunner(
funcs, evalType, argOffsets, _schema, _timeZoneId, largeVarTypes,
pythonMetrics, jobArtifactUUID, sessionUUID) {
+ override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
+
override protected def writeUDF(dataOut: DataOutputStream): Unit =
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets, profiler)
}
@@ -128,7 +130,7 @@ class ArrowPythonWithNamedArgumentRunner(
_schema: StructType,
_timeZoneId: String,
largeVarTypes: Boolean,
- protected override val runnerConf: Map[String, String],
+ pythonRunnerConf: Map[String, String],
pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
sessionUUID: Option[String],
@@ -137,6 +139,8 @@ class ArrowPythonWithNamedArgumentRunner(
funcs, evalType, argMetas.map(_.map(_.offset)), _schema, _timeZoneId,
largeVarTypes,
pythonMetrics, jobArtifactUUID, sessionUUID) {
+ override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
+
override protected def writeUDF(dataOut: DataOutputStream): Unit = {
if (evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF) {
PythonWorkerUtils.writeUTF(schema.json, dataOut)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index 979d91205d5a..e5c7be2f4070 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -39,7 +39,7 @@ class ArrowPythonUDTFRunner(
protected override val schema: StructType,
protected override val timeZoneId: String,
protected override val largeVarTypes: Boolean,
- protected override val runnerConf: Map[String, String],
+ pythonRunnerConf: Map[String, String],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
sessionUUID: Option[String])
@@ -49,6 +49,8 @@ class ArrowPythonUDTFRunner(
with BatchedPythonArrowInput
with BasicPythonArrowOutput {
+ override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
+
override protected def writeUDF(dataOut: DataOutputStream): Unit = {
// For arrow-optimized Python UDTFs (@udtf(useArrow=True)), we need to
write
// the schema to the worker to support UDT (user-defined type).
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index b5986be9214a..e99fc1560e8e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -45,7 +45,7 @@ class CoGroupedArrowPythonRunner(
rightSchema: StructType,
timeZoneId: String,
largeVarTypes: Boolean,
- protected override val runnerConf: Map[String, String],
+ pythonRunnerConf: Map[String, String],
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
sessionUUID: Option[String],
@@ -55,6 +55,8 @@ class CoGroupedArrowPythonRunner(
funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with BasicPythonArrowOutput {
+ override protected def runnerConf: Map[String, String] = super.runnerConf ++
pythonRunnerConf
+
override val envVars: util.Map[String, String] = {
val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars)
sessionUUID.foreach { uuid =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index ae89ff1637ed..a4909535881d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -113,9 +113,11 @@ class ApplyInPandasWithStatePythonRunner(
// applyInPandasWithState has its own mechanism to construct the Arrow
RecordBatch instance.
// Configurations are both applied to executor and Python worker, set them
to the worker conf
// to let Python worker read the config properly.
- override protected val runnerConf: Map[String, String] = initialRunnerConf +
- (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key ->
arrowMaxRecordsPerBatch.toString) +
- (SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key ->
arrowMaxBytesPerBatch.toString)
+ override protected def runnerConf: Map[String, String] =
+ super.runnerConf ++ initialRunnerConf ++ Map(
+ SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key ->
arrowMaxRecordsPerBatch.toString,
+ SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key ->
arrowMaxBytesPerBatch.toString
+ )
private val stateRowDeserializer = stateEncoder.createDeserializer()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
index bbf7b9387526..2ea3889be053 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala
@@ -238,9 +238,11 @@ abstract class
TransformWithStateInPySparkPythonBaseRunner[I](
protected val arrowMaxRecordsPerBatch = sqlConf.arrowMaxRecordsPerBatch
protected val arrowMaxBytesPerBatch = sqlConf.arrowMaxBytesPerBatch
- override protected val runnerConf: Map[String, String] = initialRunnerConf +
- (SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key ->
arrowMaxRecordsPerBatch.toString) +
- (SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key ->
arrowMaxBytesPerBatch.toString)
+ override protected def runnerConf: Map[String, String] =
+ super.runnerConf ++ initialRunnerConf ++ Map(
+ SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key ->
arrowMaxRecordsPerBatch.toString,
+ SQLConf.ARROW_EXECUTION_MAX_BYTES_PER_BATCH.key ->
arrowMaxBytesPerBatch.toString
+ )
// Use lazy val to initialize the fields before these are accessed in
[[PythonArrowInput]]'s
// constructor.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]