This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new daa6f59399b8 [SPARK-57191][YARN] Fix driver hang when MonitorThread 
encounters unexpected exception
daa6f59399b8 is described below

commit daa6f59399b84295d7d04d178564f504c58eb2d8
Author: shrirangmhalgi <[email protected]>
AuthorDate: Thu Jun 4 19:56:06 2026 +0900

    [SPARK-57191][YARN] Fix driver hang when MonitorThread encounters 
unexpected exception
    
    ### What changes were proposed in this pull request?
    In YARN client mode, `YarnClientSchedulerBackend`'s `MonitorThread` only 
catches `InterruptedException` / `InterruptedIOException`. If any other 
exception occurs during application monitoring (e.g., network failure, 
credential expiration, or other runtime errors), the thread dies silently. 
Since the driver JVM has active non-daemon threads (SparkUI, heartbeats), the 
process hangs indefinitely in a zombie state.
    
    This patch adds a `NonFatal` catch clause that logs the error and calls 
`sc.stop()`, ensuring the driver shuts down cleanly.
    
    ### Why are the changes needed?
    In managed environments (cloud platform agents, workflow schedulers), a 
hung driver is indistinguishable from one doing legitimate post-execution work. 
This causes resource leakage, orphaned processes, and extended job timeout 
durations.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Previously, certain failures in the monitor thread caused the driver 
to hang forever. Now the driver shuts down cleanly with an error log.
    
    ### How was this patch tested?
    Added a new test in `YarnClientSchedulerBackendSuite` with a test that 
mocks `Client.monitorApplication` to throw a `RuntimeException` and asserts 
`sc.stop()` is called (via `SparkListener.onApplicationEnd`).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes.
    
    Closes #56274 from shrirangmhalgi/SPARK-57191-yarn-driver-hang.
    
    Authored-by: shrirangmhalgi <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
    (cherry picked from commit d9ed8434b97905b0dfd506aea2bb56cc85c92724)
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../cluster/YarnClientSchedulerBackend.scala       | 10 +++
 .../cluster/YarnClientSchedulerBackendSuite.scala  | 73 ++++++++++++++++++++++
 2 files changed, 83 insertions(+)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9389a13e292f..3018cb8ed739 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.io.InterruptedIOException
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, 
YarnApplicationState}
 
@@ -135,6 +136,15 @@ private[spark] class YarnClientSchedulerBackend(
       } catch {
         case _: InterruptedException | _: InterruptedIOException =>
           logInfo("Interrupting monitor thread")
+        case NonFatal(e) =>
+          logError(log"Unexpected error in YARN application state monitor 
thread.", e)
+          allowInterrupt = false
+          sc.stop(1)
+          if (conf.get(AM_CLIENT_MODE_EXIT_ON_ERROR)) {
+            logWarning(log"SparkContext stopped due to unexpected error, " +
+              log"exiting with code 1.")
+            System.exit(1)
+          }
       }
     }
 
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackendSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackendSuite.scala
new file mode 100644
index 000000000000..da231bd2d707
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackendSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.mockito.ArgumentMatchers.{anyBoolean, anyLong}
+import org.mockito.Mockito.{mock, when}
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.Client
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, 
TaskSchedulerImpl}
+
+class YarnClientSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext {
+
+  test("SPARK-57191: MonitorThread calls sc.stop() on unexpected exception") {
+    val stopCalled = new CountDownLatch(1)
+    sc = new SparkContext("local", "test", new 
SparkConf().set("spark.testing", "true"))
+    sc.addSparkListener(new SparkListener {
+      override def onApplicationEnd(e: SparkListenerApplicationEnd): Unit =
+        stopCalled.countDown()
+    })
+
+    val backend = new YarnClientSchedulerBackend(
+      sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc)
+
+    // Simulate MonitorThread hitting an unexpected non-fatal error
+    // (e.g., credential expiry, network failure)
+    val mockClient = mock(classOf[Client])
+    when(mockClient.monitorApplication(anyBoolean(), anyBoolean(), anyLong()))
+      .thenThrow(new RuntimeException("Simulated failure"))
+
+    // Use reflection since client/appId are private and MonitorThread is an 
inner class
+    val clientField = 
backend.getClass.getDeclaredFields.find(_.getName.endsWith("client")).get
+    clientField.setAccessible(true)
+    clientField.set(backend, mockClient)
+
+    val appIdField = classOf[YarnSchedulerBackend].getDeclaredField("appId")
+    appIdField.setAccessible(true)
+    appIdField.set(backend,
+      Some(org.apache.hadoop.yarn.api.records.ApplicationId.newInstance(0L, 
1)))
+
+    val monitorMethod = 
backend.getClass.getDeclaredMethod("asyncMonitorApplication")
+    monitorMethod.setAccessible(true)
+    val monitorThread = monitorMethod.invoke(backend).asInstanceOf[Thread]
+
+    // Assign to backend.monitorThread so the full shutdown path is exercised:
+    // sc.stop() -> YarnClientSchedulerBackend.stop() -> 
monitorThread.stopMonitor()
+    val monitorField = backend.getClass.getDeclaredFields
+      .find(_.getName.endsWith("monitorThread")).get
+    monitorField.setAccessible(true)
+    monitorField.set(backend, monitorThread)
+
+    monitorThread.start()
+
+    assert(stopCalled.await(10, TimeUnit.SECONDS),
+      "sc.stop() was not called after MonitorThread hit an unexpected 
exception")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to