This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3115669444e0 [SPARK-54925][PYTHON] Add the capability to dump threads
for pyspark
3115669444e0 is described below
commit 3115669444e09a3a3096112ecb296bad2606b05d
Author: Tian Gao <[email protected]>
AuthorDate: Thu Jan 8 09:26:08 2026 +0800
[SPARK-54925][PYTHON] Add the capability to dump threads for pyspark
### What changes were proposed in this pull request?
Add an optional capability to dump thread info of *all* pyspark processes.
It is intentionally hidden now because it's not fully polished. It can be used
as `python -m pyspark.threaddump -p <pid>`. It requires `pystack` and `psutil`.
Without these libraries the command will fail.
For now it was only used when test hangs. The result would be like:
```
Thread dump:
Dumping threads for process 1904
Traceback for thread 2175 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in
_bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
self._target(*self._args, **self._kwargs)
(Python) File "/usr/lib/python3.12/socketserver.py", line 240, in
serve_forever
self._handle_request_noblock()
(Python) File "/usr/lib/python3.12/socketserver.py", line 318, in
_handle_request_noblock
self.process_request(request, client_address)
(Python) File "/usr/lib/python3.12/socketserver.py", line 349, in
process_request
self.finish_request(request, client_address)
(Python) File "/usr/lib/python3.12/socketserver.py", line 362, in
finish_request
self.RequestHandlerClass(request, client_address, self)
(Python) File "/usr/lib/python3.12/socketserver.py", line 766, in
__init__
self.handle()
(Python) File "/workspaces/spark/python/pyspark/accumulators.py", line
327, in handle
poll(accum_updates)
(Python) File "/workspaces/spark/python/pyspark/accumulators.py", line
281, in poll
for fd, event in poller.poll(1000):
Traceback for thread 2034 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in
_bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File
"/workspaces/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line
58, in run
Traceback for thread 1904 (python3.12) [] (most recent call last):
(Python) File "<frozen runpy>", line 198, in _run_module_as_main
(Python) File "<frozen runpy>", line 88, in _run_code
(Python) File "/workspaces/spark/python/pyspark/sql/tests/test_udf.py",
line 1790, in <module>
unittest.main(testRunner=testRunner, verbosity=2)
(Python) File "/workspaces/spark/python/pyspark/testing/__init__.py",
line 30, in unittest_main
res = _unittest_main(*args, **kwargs)
(Python) File "/usr/lib/python3.12/unittest/main.py", line 105, in
__init__
self.runTests()
(Python) File "/usr/lib/python3.12/unittest/main.py", line 281, in
runTests
self.result = testRunner.run(self.test)
(Python) File
"/usr/local/lib/python3.12/dist-packages/xmlrunner/runner.py", line 67, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in
__call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in
__call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 690, in
__call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 634, in run
self._callTestMethod(testMethod)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 589, in
_callTestMethod
if method() is not None:
(Python) File "/workspaces/spark/python/pyspark/sql/tests/test_udf.py",
line 212, in test_chained_udf
[row] = self.spark.sql("SELECT double_int(double_int(1) +
1)").collect()
(Python) File
"/workspaces/spark/python/pyspark/sql/classic/dataframe.py", line 469, in
collect
sock_info = self._jdf.collectToPython()
(Python) File
"/workspaces/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line
1361, in __call__
(Python) File
"/workspaces/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line
1038, in send_command
(Python) File
"/workspaces/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line
535, in send_command
(Python) File "/usr/lib/python3.12/socket.py", line 720, in readinto
return self._sock.recv_into(b)
Dumping threads for process 2191
Traceback for thread 2191 (python3.12) [] (most recent call last):
(Python) File "<frozen runpy>", line 198, in _run_module_as_main
(Python) File "<frozen runpy>", line 88, in _run_code
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 287, in
<module>
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 180, in
manager
Dumping threads for process 2198
Traceback for thread 2198 (python3.12) [] (most recent call last):
(Python) File "<frozen runpy>", line 198, in _run_module_as_main
(Python) File "<frozen runpy>", line 88, in _run_code
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 287, in
<module>
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 259, in
manager
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 88, in worker
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/util.py", line 981, in wrapper
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3439, in main
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 583, in
read_int
Dumping threads for process 2257
Traceback for thread 2257 (python3.12) [] (most recent call last):
(Python) File "<frozen runpy>", line 198, in _run_module_as_main
(Python) File "<frozen runpy>", line 88, in _run_code
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 287, in
<module>
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 259, in
manager
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 88, in worker
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/util.py", line 981, in wrapper
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/worker.py", line 3439, in main
(Python) File
"/workspaces/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 583, in
read_int
```
Notice that it has not only the driver process, but also daemon and worker
process. The plan is to incorporate this into our existing debug framework so
`threaddump` button will return both JVM executor threads and python worker
threads.
### Why are the changes needed?
We need insights into python worker/daemon.
We have some thread dump capability in our test, but that's not stable.
`SIGTERM` sometimes is hooked and `faulthandler` can't work properly. Also it
can't dump the subprocesses.
### Does this PR introduce _any_ user-facing change?
Yes, but it's hidden for now. A new command entry is introduced.
### How was this patch tested?
Locally it works.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53705 from gaogaotiantian/python-threaddump.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
dev/requirements.txt | 2 +
dev/spark-test-image/python-311/Dockerfile | 2 +-
python/pyspark/threaddump.py | 62 ++++++++++++++++++++++++++++++
python/run-tests.py | 15 +++++++-
4 files changed, 79 insertions(+), 2 deletions(-)
diff --git a/dev/requirements.txt b/dev/requirements.txt
index 7eb157352408..a64f9c4cc50a 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -77,6 +77,8 @@ graphviz==0.20.3
flameprof==0.4
viztracer
debugpy
+pystack>=1.5.1; python_version!='3.13' and sys_platform=='linux' # no 3.13t
wheels
+psutil
# TorchDistributor dependencies
torch
diff --git a/dev/spark-test-image/python-311/Dockerfile
b/dev/spark-test-image/python-311/Dockerfile
index 50c042c9da1a..0db52262fa84 100644
--- a/dev/spark-test-image/python-311/Dockerfile
+++ b/dev/spark-test-image/python-311/Dockerfile
@@ -68,7 +68,7 @@ RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
-ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy
plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0
scikit-learn>=1.3.2"
+ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy
plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0
scikit-learn>=1.3.2 pystack psutil"
# Python deps for Spark Connect
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0
googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
diff --git a/python/pyspark/threaddump.py b/python/pyspark/threaddump.py
new file mode 100644
index 000000000000..0cf0feb81096
--- /dev/null
+++ b/python/pyspark/threaddump.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import argparse
+import sys
+
+
+def build_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(description="Dump threads of a process
and its children")
+ parser.add_argument("-p", "--pid", type=int, required=True, help="The PID
to dump")
+ return parser
+
+
+def main() -> int:
+ try:
+ import psutil
+ from pystack.__main__ import main as pystack_main # type: ignore
+ except ImportError:
+ print("pystack and psutil are not installed")
+ return 1
+
+ parser = build_parser()
+ args = parser.parse_args()
+
+ try:
+ pids = [args.pid] + [
+ child.pid
+ for child in psutil.Process(args.pid).children(recursive=True)
+ if "python" in child.exe()
+ ]
+ except Exception as e:
+ print(f"Error getting children of process {args.pid}: {e}")
+ return 2
+
+ for pid in pids:
+ sys.argv = ["pystack", "remote", str(pid)]
+ try:
+ print(f"Dumping threads for process {pid}")
+ pystack_main()
+ except Exception:
+ # We might tried to dump a process that is not a Python process
+ pass
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/python/run-tests.py b/python/run-tests.py
index b3522a13df4a..2676a67d9687 100755
--- a/python/run-tests.py
+++ b/python/run-tests.py
@@ -113,7 +113,7 @@ class TestRunner:
try:
asyncio.run(self.handle_inout())
except subprocess.TimeoutExpired:
- LOGGER.error(f"Test {self.test_name} timed out")
+ LOGGER.error(f"Test {self.test_name} timed out after
{self.timeout} seconds")
try:
return self.p.wait(timeout=30)
except subprocess.TimeoutExpired:
@@ -204,9 +204,22 @@ class TestRunner:
# We don't want to kill the process if it's in pdb mode
return
if self.p.poll() is None:
+ if sys.platform == "linux":
+ self.thread_dump(self.p.pid)
self.p.terminate()
raise subprocess.TimeoutExpired(self.cmd, self.timeout)
+ def thread_dump(self, pid):
+ pyspark_python = self.env['PYSPARK_PYTHON']
+ p = subprocess.run(
+ [pyspark_python, "-m", "pyspark.threaddump", "-p", str(pid)],
+ env={**self.env, "PYTHONPATH": f"{os.path.join(SPARK_HOME,
'python')}:{os.environ.get('PYTHONPATH', '')}"},
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ if p.returncode == 0:
+ LOGGER.error(f"Thread dump:\n{p.stdout.decode('utf-8')}")
+
def run_individual_python_test(target_dir, test_name, pyspark_python,
keep_test_output):
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]