This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a7e96c05f4c [SPARK-54711][PYTHON] Add a timeout for daemon created 
worker connection
5a7e96c05f4c is described below

commit 5a7e96c05f4c2173ee70a74100580462a79fcc0c
Author: Tian Gao <[email protected]>
AuthorDate: Wed Dec 17 14:09:46 2025 -0800

    [SPARK-54711][PYTHON] Add a timeout for daemon created worker connection
    
    ### What changes were proposed in this pull request?
    
    A timeout mechanism is introduced for daemon based workers. If the executor 
can't establish a connection with the spawned worker, it will restart the 
daemon.
    
    ### Why are the changes needed?
    
    Without the mechanism, if the worker somehow hangs before it can establish 
the connection with the executor, the executor will hang forever.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    It is confirmed to have fixed a daemon worker hanging issue.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53476 from gaogaotiantian/timeout-for-daemon.
    
    Authored-by: Tian Gao <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 .../spark/api/python/PythonWorkerFactory.scala     | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index f3e364a4be5e..350818e18cb9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -168,6 +168,22 @@ private[spark] class PythonWorkerFactory(
       } else {
         SocketChannel.open(new InetSocketAddress(daemonHost, daemonPort))
       }
+
+      val serverSelector = Selector.open()
+      try {
+        val timeOutMs = 60 * 1000
+        socketChannel.configureBlocking(false)
+        socketChannel.register(serverSelector, SelectionKey.OP_READ)
+        if (serverSelector.select(timeOutMs) == 0) {
+          throw new SocketTimeoutException(
+            s"Timed out while waiting for the Python worker to connect back 
after $timeOutMs ms"
+          )
+        }
+      } finally {
+        serverSelector.close()
+      }
+      socketChannel.configureBlocking(true)
+
       // These calls are blocking.
       val pid = new 
DataInputStream(Channels.newInputStream(socketChannel)).readInt()
       if (pid < 0) {
@@ -197,6 +213,12 @@ private[spark] class PythonWorkerFactory(
           stopDaemon()
           startDaemon()
           createWorker()
+        case exc: SocketTimeoutException =>
+          logWarning(exc.toString)
+          logWarning("Lost connection to Python daemon, attempting to restart")
+          stopDaemon()
+          startDaemon()
+          createWorker()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to