This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1a8859d24763 [SPARK-52579][PYTHON] Set periodical traceback dump for
Python workers
1a8859d24763 is described below
commit 1a8859d247635a22573311b3b935fed0ae3f10b1
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Jun 26 12:50:50 2025 +0900
[SPARK-52579][PYTHON] Set periodical traceback dump for Python workers
### What changes were proposed in this pull request?
Sets periodical traceback dump for Python workers.
To enable:
- `spark.python.worker.tracebackDumpIntervalSeconds` (SparkConf, default
`0`)
> The interval (in seconds) for Python workers to dump their tracebacks.
If it's positive, the Python worker will periodically dump the traceback into
its executor's `stderr`. The default is `0` that means it is disabled.
- `spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds` (SQLConf,
fallback to the above)
> Same as spark.python.worker.tracebackDumpIntervalSeconds for Python
execution with DataFrame and SQL. It can change during runtime.
### Why are the changes needed?
To monitor the Python worker progress.
### Does this PR introduce _any_ user-facing change?
Yes, the traceback will be dumped periodically when the config is set to a
positive number.
```py
>>> from pyspark.sql.functions import *
>>> import time
>>>
>>> udf("long")
... def f(x):
... time.sleep(12)
... return x
...
>>> df = spark.range(1).select(f(col("id")))
>>>
spark.conf.set('spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds',
5)
>>>
>>> df.show()
Timeout (0:00:05)!
Thread 0x00000001ede60f80 (most recent call first):
File "<stdin>", line 3 in f
File "/.../python/pyspark/util.py", line 135 in wrapper
File "/.../python/pyspark/worker.py", line 121 in <lambda>
...
Timeout (0:00:05)!
Thread 0x00000001ede60f80 (most recent call first):
File "<stdin>", line 3 in f
File "/.../python/pyspark/util.py", line 135 in wrapper
File "/.../python/pyspark/worker.py", line 121 in <lambda>
...
+-----+
|f(id)|
+-----+
| 0|
+-----+
```
### How was this patch tested?
Manually, and existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51286 from ueshin/issues/SPARK-52579/traceback.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/api/python/PythonRunner.scala | 5 +++++
.../main/scala/org/apache/spark/internal/config/Python.scala | 10 ++++++++++
python/pyspark/worker.py | 9 +++++++++
.../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
.../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 2 ++
.../spark/sql/execution/python/ArrowPythonUDTFRunner.scala | 2 ++
.../sql/execution/python/CoGroupedArrowPythonRunner.scala | 2 ++
.../apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 ++
.../python/streaming/ApplyInPandasWithStatePythonRunner.scala | 2 ++
.../sql/execution/python/streaming/PythonForeachWriter.scala | 2 ++
10 files changed, 47 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 287dc8694228..3ef55ce9def0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -170,6 +170,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val faultHandlerEnabled: Boolean =
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
protected val idleTimeoutSeconds: Long =
conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
protected val killOnIdleTimeout: Boolean =
conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
+ protected val tracebackDumpIntervalSeconds: Long =
+ conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
protected val hideTraceback: Boolean = false
protected val simplifiedTraceback: Boolean = false
@@ -267,6 +269,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (faultHandlerEnabled) {
envVars.put("PYTHON_FAULTHANDLER_DIR", faultHandlerLogDir.toString)
}
+ if (tracebackDumpIntervalSeconds > 0L) {
+ envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS",
tracebackDumpIntervalSeconds.toString)
+ }
// allow the user to set the batch size for the BatchedSerializer on UDFs
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index 46d54be92f3d..8c3adedb372a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -117,4 +117,14 @@ private[spark] object Python {
.version("4.1.0")
.booleanConf
.createWithDefault(false)
+
+ val PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
+ ConfigBuilder("spark.python.worker.tracebackDumpIntervalSeconds")
+ .doc("The interval (in seconds) for Python workers to dump their
tracebacks. " +
+ "If it's positive, the Python worker will periodically dump the
traceback into " +
+ "its executor's `stderr`. The default is `0` that means it is
disabled.")
+ .version("4.1.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ >= 0, "The interval should be 0 or positive.")
+ .createWithDefault(0)
}
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 67cf25cff6e6..3454c9855a58 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2347,6 +2347,9 @@ def read_udfs(pickleSer, infile, eval_type):
def main(infile, outfile):
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds =
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
+ if tracebackDumpIntervalSeconds is not None:
+ tracebackDumpIntervalSeconds = int(tracebackDumpIntervalSeconds)
try:
if faulthandler_log_path:
faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
@@ -2358,6 +2361,9 @@ def main(infile, outfile):
if split_index == -1: # for unit tests
sys.exit(-1)
+ if tracebackDumpIntervalSeconds is not None and
tracebackDumpIntervalSeconds > 0:
+ faulthandler.dump_traceback_later(tracebackDumpIntervalSeconds,
repeat=True)
+
check_python_version(infile)
# read inputs only for a barrier task
@@ -2465,6 +2471,9 @@ def main(infile, outfile):
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+ # Force to cancel dump_traceback_later
+ faulthandler.cancel_dump_traceback_later()
+
if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 48feb26d653b..c921f9d9c08b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3549,6 +3549,14 @@ object SQLConf {
.version("4.1.0")
.fallbackConf(Python.PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
+ val PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
+ buildConf("spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds")
+ .doc(
+ s"Same as ${Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS.key}
" +
+ "for Python execution with DataFrame and SQL. It can change during
runtime.")
+ .version("4.1.0")
+ .fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+
val PYSPARK_PLOT_MAX_ROWS =
buildConf("spark.sql.pyspark.plotting.max_rows")
.doc("The visual limit on plots. If set to 1000 for top-n-based plots
(pie, bar, barh), " +
@@ -6731,6 +6739,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def pythonUDFWorkerKillOnIdleTimeout: Boolean =
getConf(PYTHON_UDF_WORKER_KILL_ON_IDLE_TIMEOUT)
+ def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
+ getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+
def pythonUDFArrowConcurrencyLevel: Option[Int] =
getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)
def pythonUDFArrowFallbackOnUDT: Boolean =
getConf(PYTHON_UDF_ARROW_FALLBACK_ON_UDT)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 9a9fb574b87f..555be307cd81 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -49,6 +49,8 @@ abstract class BaseArrowPythonRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
override val errorOnDuplicatedFieldNames: Boolean = true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index ae875c777b43..86136e444d43 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -58,6 +58,8 @@ class ArrowPythonUDTFRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
override val errorOnDuplicatedFieldNames: Boolean = true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 27d6f7dc1c66..8b160accd7a4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -62,6 +62,8 @@ class CoGroupedArrowPythonRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 4baddcd4d9e7..3f30519e9521 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -48,6 +48,8 @@ abstract class BasePythonUDFRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
override val bufferSize: Int =
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
override val batchSizeForPythonUDF: Int =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 0de937df05f4..6f2c1a986c27 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -78,6 +78,8 @@ class ApplyInPandasWithStatePythonRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
private val sqlConf = SQLConf.get
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 04c51c859bac..01643af9cf30 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -102,6 +102,8 @@ class PythonForeachWriter(func: PythonFunction, schema:
StructType)
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
override val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+ override val tracebackDumpIntervalSeconds: Long =
+ SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]