This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a8859d24763 [SPARK-52579][PYTHON] Set periodical traceback dump for 
Python workers
1a8859d24763 is described below

commit 1a8859d247635a22573311b3b935fed0ae3f10b1
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Jun 26 12:50:50 2025 +0900

    [SPARK-52579][PYTHON] Set periodical traceback dump for Python workers
    
    ### What changes were proposed in this pull request?
    
    Sets periodical traceback dump for Python workers.
    
    To enable:
    - `spark.python.worker.tracebackDumpIntervalSeconds` (SparkConf, default 
`0`)
      > The interval (in seconds) for Python workers to dump their tracebacks. 
If it's positive, the Python worker will periodically dump the traceback into 
its executor's `stderr`. The default is `0` that means it is disabled.
    - `spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds` (SQLConf, 
fallback to the above)
      > Same as spark.python.worker.tracebackDumpIntervalSeconds for Python 
execution with DataFrame and SQL. It can change during runtime.
    
    ### Why are the changes needed?
    
    To monitor the Python worker progress.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the traceback will be dumped periodically when the config is set to a 
positive number.
    
    ```py
    >>> from pyspark.sql.functions import *
    >>> import time
    >>>
    >>> udf("long")
    ... def f(x):
    ...     time.sleep(12)
    ...     return x
    ...
    >>> df = spark.range(1).select(f(col("id")))
    
    >>> 
spark.conf.set('spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds', 
5)
    >>>
    >>> df.show()
    Timeout (0:00:05)!
    Thread 0x00000001ede60f80 (most recent call first):
      File "<stdin>", line 3 in f
      File "/.../python/pyspark/util.py", line 135 in wrapper
      File "/.../python/pyspark/worker.py", line 121 in <lambda>
    ...
    
    Timeout (0:00:05)!
    Thread 0x00000001ede60f80 (most recent call first):
      File "<stdin>", line 3 in f
      File "/.../python/pyspark/util.py", line 135 in wrapper
      File "/.../python/pyspark/worker.py", line 121 in <lambda>
    ...
    
    +-----+
    |f(id)|
    +-----+
    |    0|
    +-----+
    ```
    
    ### How was this patch tested?
    
    Manually, and existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51286 from ueshin/issues/SPARK-52579/traceback.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala |  5 +++++
 .../main/scala/org/apache/spark/internal/config/Python.scala  | 10 ++++++++++
 python/pyspark/worker.py                                      |  9 +++++++++
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++++++++
 .../apache/spark/sql/execution/python/ArrowPythonRunner.scala |  2 ++
 .../spark/sql/execution/python/ArrowPythonUDTFRunner.scala    |  2 ++
 .../sql/execution/python/CoGroupedArrowPythonRunner.scala     |  2 ++
 .../apache/spark/sql/execution/python/PythonUDFRunner.scala   |  2 ++
 .../python/streaming/ApplyInPandasWithStatePythonRunner.scala |  2 ++
 .../sql/execution/python/streaming/PythonForeachWriter.scala  |  2 ++
 10 files changed, 47 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 287dc8694228..3ef55ce9def0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -170,6 +170,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   protected val faultHandlerEnabled: Boolean = 
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
   protected val idleTimeoutSeconds: Long = 
conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
   protected val killOnIdleTimeout: Boolean = 
conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
+  protected val tracebackDumpIntervalSeconds: Long =
+    conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
   protected val hideTraceback: Boolean = false
   protected val simplifiedTraceback: Boolean = false
 
@@ -267,6 +269,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     if (faultHandlerEnabled) {
       envVars.put("PYTHON_FAULTHANDLER_DIR", faultHandlerLogDir.toString)
     }
+    if (tracebackDumpIntervalSeconds > 0L) {
+      envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", 
tracebackDumpIntervalSeconds.toString)
+    }
     // allow the user to set the batch size for the BatchedSerializer on UDFs
     envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index 46d54be92f3d..8c3adedb372a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -117,4 +117,14 @@ private[spark] object Python {
     .version("4.1.0")
     .booleanConf
     .createWithDefault(false)
+
+  val PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
+    ConfigBuilder("spark.python.worker.tracebackDumpIntervalSeconds")
+      .doc("The interval (in seconds) for Python workers to dump their 
tracebacks. " +
+        "If it's positive, the Python worker will periodically dump the 
traceback into " +
+        "its executor's `stderr`. The default is `0` that means it is 
disabled.")
+      .version("4.1.0")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ >= 0, "The interval should be 0 or positive.")
+      .createWithDefault(0)
 }
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 67cf25cff6e6..3454c9855a58 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -2347,6 +2347,9 @@ def read_udfs(pickleSer, infile, eval_type):
 
 def main(infile, outfile):
     faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+    tracebackDumpIntervalSeconds = 
os.environ.get("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None)
+    if tracebackDumpIntervalSeconds is not None:
+        tracebackDumpIntervalSeconds = int(tracebackDumpIntervalSeconds)
     try:
         if faulthandler_log_path:
             faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
@@ -2358,6 +2361,9 @@ def main(infile, outfile):
         if split_index == -1:  # for unit tests
             sys.exit(-1)
 
+        if tracebackDumpIntervalSeconds is not None and 
tracebackDumpIntervalSeconds > 0:
+            faulthandler.dump_traceback_later(tracebackDumpIntervalSeconds, 
repeat=True)
+
         check_python_version(infile)
 
         # read inputs only for a barrier task
@@ -2465,6 +2471,9 @@ def main(infile, outfile):
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
         sys.exit(-1)
 
+    # Force to cancel dump_traceback_later
+    faulthandler.cancel_dump_traceback_later()
+
 
 if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the 
environment.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 48feb26d653b..c921f9d9c08b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3549,6 +3549,14 @@ object SQLConf {
       .version("4.1.0")
       .fallbackConf(Python.PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
 
+  val PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS =
+    buildConf("spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds")
+      .doc(
+        s"Same as ${Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS.key} 
" +
+          "for Python execution with DataFrame and SQL. It can change during 
runtime.")
+      .version("4.1.0")
+      .fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+
   val PYSPARK_PLOT_MAX_ROWS =
     buildConf("spark.sql.pyspark.plotting.max_rows")
       .doc("The visual limit on plots. If set to 1000 for top-n-based plots 
(pie, bar, barh), " +
@@ -6731,6 +6739,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def pythonUDFWorkerKillOnIdleTimeout: Boolean = 
getConf(PYTHON_UDF_WORKER_KILL_ON_IDLE_TIMEOUT)
 
+  def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
+    getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+
   def pythonUDFArrowConcurrencyLevel: Option[Int] = 
getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)
 
   def pythonUDFArrowFallbackOnUDT: Boolean = 
getConf(PYTHON_UDF_ARROW_FALLBACK_ON_UDT)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 9a9fb574b87f..555be307cd81 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -49,6 +49,8 @@ abstract class BaseArrowPythonRunner(
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
   override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+  override val tracebackDumpIntervalSeconds: Long =
+    SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
   override val errorOnDuplicatedFieldNames: Boolean = true
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index ae875c777b43..86136e444d43 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -58,6 +58,8 @@ class ArrowPythonUDTFRunner(
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
   override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+  override val tracebackDumpIntervalSeconds: Long =
+    SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
   override val errorOnDuplicatedFieldNames: Boolean = true
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 27d6f7dc1c66..8b160accd7a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -62,6 +62,8 @@ class CoGroupedArrowPythonRunner(
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
   override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+  override val tracebackDumpIntervalSeconds: Long =
+    SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
   override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
   override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 4baddcd4d9e7..3f30519e9521 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -48,6 +48,8 @@ abstract class BasePythonUDFRunner(
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
   override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+  override val tracebackDumpIntervalSeconds: Long =
+    SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
   override val bufferSize: Int = 
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
   override val batchSizeForPythonUDF: Int =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 0de937df05f4..6f2c1a986c27 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -78,6 +78,8 @@ class ApplyInPandasWithStatePythonRunner(
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
   override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+  override val tracebackDumpIntervalSeconds: Long =
+    SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
   private val sqlConf = SQLConf.get
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 04c51c859bac..01643af9cf30 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -102,6 +102,8 @@ class PythonForeachWriter(func: PythonFunction, schema: 
StructType)
       override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
       override val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
       override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
+      override val tracebackDumpIntervalSeconds: Long =
+        SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
 
       override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
       override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to