This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5a7e96c05f4c [SPARK-54711][PYTHON] Add a timeout for daemon created
worker connection
5a7e96c05f4c is described below
commit 5a7e96c05f4c2173ee70a74100580462a79fcc0c
Author: Tian Gao <[email protected]>
AuthorDate: Wed Dec 17 14:09:46 2025 -0800
[SPARK-54711][PYTHON] Add a timeout for daemon created worker connection
### What changes were proposed in this pull request?
A timeout mechanism is introduced for daemon based workers. If the executor
can't establish a connection with the spawned worker, it will restart the
daemon.
### Why are the changes needed?
Without the mechanism, if the worker somehow hangs before it can establish
the connection with the executor, the executor will hang forever.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
It is confirmed to have fixed a daemon worker hanging issue.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53476 from gaogaotiantian/timeout-for-daemon.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
.../spark/api/python/PythonWorkerFactory.scala | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index f3e364a4be5e..350818e18cb9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -168,6 +168,22 @@ private[spark] class PythonWorkerFactory(
} else {
SocketChannel.open(new InetSocketAddress(daemonHost, daemonPort))
}
+
+ val serverSelector = Selector.open()
+ try {
+ val timeOutMs = 60 * 1000
+ socketChannel.configureBlocking(false)
+ socketChannel.register(serverSelector, SelectionKey.OP_READ)
+ if (serverSelector.select(timeOutMs) == 0) {
+ throw new SocketTimeoutException(
+ s"Timed out while waiting for the Python worker to connect back
after $timeOutMs ms"
+ )
+ }
+ } finally {
+ serverSelector.close()
+ }
+ socketChannel.configureBlocking(true)
+
// These calls are blocking.
val pid = new
DataInputStream(Channels.newInputStream(socketChannel)).readInt()
if (pid < 0) {
@@ -197,6 +213,12 @@ private[spark] class PythonWorkerFactory(
stopDaemon()
startDaemon()
createWorker()
+ case exc: SocketTimeoutException =>
+ logWarning(exc.toString)
+ logWarning("Lost connection to Python daemon, attempting to restart")
+ stopDaemon()
+ startDaemon()
+ createWorker()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]