This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ba0ed11eda [SPARK-53564][CORE] Avoid DAGScheduler exits due to 
blockManager RPC timeout in DAGSchedulerEventProcessLoop
26ba0ed11eda is described below

commit 26ba0ed11edaa2df0086d1fe73803051feda4fae
Author: Tengfei Huang <[email protected]>
AuthorDate: Thu Oct 9 19:19:22 2025 +0800

    [SPARK-53564][CORE] Avoid DAGScheduler exits due to blockManager RPC 
timeout in DAGSchedulerEventProcessLoop
    
    ### What changes were proposed in this pull request?
    Currently there are a few blocking RPC requests used in 
`DAGSchedulerEventProcessLoop`:
    1. 
[blockManagerMaster.getLocations(blockIds)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L451)
 used in `getCacheLocs` to get rdd cache locations.
    2. 
[blockManagerMaster.removeShufflePushMergerLocation](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2754)
 used in `removeExecutorAndUnregisterOutputs`.
    3. 
[blockManagerMaster.removeExecutor(execId)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2765)
 used in `removeExecutorAndUnregisterOutputs`.
    
    `RpcTimeoutException` could be thrown if there are slow events blocking the 
`BlockManagerMasterEndpoint`, and the exception is not handled in the 
`DAGSchedulerEventProcessLoop`. Once this happens, the DAGScheduler will exit.
    
    This PR proposes to catch and handle the `RpcTimeoutException` properly 
instead of crashing the application. There are 2 scenarios:
    1. Change the requests in `removeExecutorAndUnregisterOutputs` to be async 
since we don't rely on the response, and let `BlockManagerMasterEndpoint` to 
deal with the potential errors.
    2. Abort the stage if rpc timeout happens while `submitStage`.
    
    ### Why are the changes needed?
    Avoid rpc timeout crashing spark application.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52335 from ivoson/SPARK-53564.
    
    Authored-by: Tengfei Huang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/rpc/RpcTimeout.scala    |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 14 +++-
 .../apache/spark/storage/BlockManagerMaster.scala  |  5 +-
 .../spark/storage/BlockManagerMasterEndpoint.scala |  9 ++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 88 +++++++++++++++++++---
 5 files changed, 99 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 770ae2f1dd22..da23f2abd3ac 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 /**
  * An exception thrown if RpcTimeout modifies a `TimeoutException`.
  */
-private[rpc] class RpcTimeoutException(message: String, cause: 
TimeoutException)
+private[spark] class RpcTimeoutException(message: String, cause: 
TimeoutException)
   extends TimeoutException(message) { initCause(cause) }
 
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3b719a2c7d24..7d77628c3f08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -49,6 +49,7 @@ import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
 import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, 
EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
 import org.apache.spark.rpc.RpcTimeout
+import org.apache.spark.rpc.RpcTimeoutException
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util._
@@ -771,7 +772,14 @@ private[spark] class DAGScheduler(
     def visit(rdd: RDD[_]): Unit = {
       if (!visited(rdd)) {
         visited += rdd
-        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
+        val rddHasUncachedPartitions = try {
+          getCacheLocs(rdd).contains(Nil)
+        } catch {
+          case e: RpcTimeoutException =>
+            logWarning(log"Failed to get cache locations for RDD ${MDC(RDD_ID, 
rdd.id)} due " +
+              log"to rpc timeout, assuming not fully cached.", e)
+            true
+        }
         if (rddHasUncachedPartitions) {
           for (dep <- rdd.dependencies) {
             dep match {
@@ -1463,7 +1471,7 @@ private[spark] class DAGScheduler(
           abortStage(stage, reason, None)
         } else {
           val missing = getMissingParentStages(stage).sortBy(_.id)
-          logDebug("missing: " + missing)
+          logInfo(log"Missing parents found for ${MDC(STAGE, stage)}: 
${MDC(MISSING_PARENT_STAGES, missing)}")
           if (missing.isEmpty) {
             logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID, 
stage.rdd)}), " +
                     log"which has no missing parents")
@@ -2807,7 +2815,7 @@ private[spark] class DAGScheduler(
         hostToUnregisterOutputs.foreach(
           host => blockManagerMaster.removeShufflePushMergerLocation(host))
       }
-      blockManagerMaster.removeExecutor(execId)
+      blockManagerMaster.removeExecutorAsync(execId)
       clearCacheLocs()
     }
     if (fileLost) {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 16c2cbbe7fdc..e77b4c278574 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -167,11 +167,12 @@ class BlockManagerMaster(
 
   /**
    * Remove the host from the candidate list of shuffle push mergers. This can 
be
-   * triggered if there is a FetchFailedException on the host
+   * triggered if there is a FetchFailedException on the host. Non-blocking.
    * @param host
    */
   def removeShufflePushMergerLocation(host: String): Unit = {
-    driverEndpoint.askSync[Unit](RemoveShufflePushMergerLocation(host))
+    logInfo(log"Request to remove shuffle push merger location ${MDC(HOST, 
host)}")
+    driverEndpoint.ask[Unit](RemoveShufflePushMergerLocation(host))
   }
 
   def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index f63d4a55669a..9d6539e09f45 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -530,7 +530,13 @@ class BlockManagerMasterEndpoint(
         blockManagerInfo.get(candidateBMId).foreach { bm =>
           val remainingLocations = locations.toSeq.filter(bm => bm != 
candidateBMId)
           val replicateMsg = ReplicateBlock(blockId, remainingLocations, 
maxReplicas)
-          bm.storageEndpoint.ask[Boolean](replicateMsg)
+          try {
+            bm.storageEndpoint.ask[Boolean](replicateMsg)
+          } catch {
+            case e: Exception =>
+              logWarning(log"Failed to request replication of ${MDC(BLOCK_ID, 
blockId)} " +
+                log"from ${MDC(BLOCK_MANAGER_ID, candidateBMId)}", e)
+          }
         }
       }
     }
@@ -554,6 +560,7 @@ class BlockManagerMasterEndpoint(
   private def removeExecutor(execId: String): Unit = {
     logInfo(log"Trying to remove executor ${MDC(EXECUTOR_ID, execId)} from 
BlockManagerMaster.")
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
+    logInfo(log"Removed ${MDC(EXECUTOR_ID, execId)} successfully in 
removeExecutor")
   }
 
   /**
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c20866fda0a3..6ec0ea320eaa 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.util.{ArrayList => JArrayList, Collections => JCollections, 
Properties}
 import java.util.concurrent.{CountDownLatch, Delayed, LinkedBlockingQueue, 
ScheduledFuture, TimeUnit}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, 
AtomicReference}
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -44,6 +44,7 @@ import 
org.apache.spark.network.shuffle.ExternalBlockStoreClient
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
 import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+import org.apache.spark.rpc.RpcTimeoutException
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.shuffle.{FetchFailedException, 
MetadataFetchFailedException}
@@ -327,7 +328,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
         }.getOrElse(Seq())
       }.toIndexedSeq
     }
-    override def removeExecutor(execId: String): Unit = {
+    override def removeExecutorAsync(execId: String): Unit = {
       // don't need to propagate to the driver, which we don't have
     }
 
@@ -751,7 +752,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     // Now the executor on hostA is lost
     runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container 
marked as failed")))
     // Executor is removed but shuffle files are not unregistered
-    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
     verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
 
     // The MapOutputTracker has all the shuffle files
@@ -764,9 +765,9 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     complete(taskSets(1), Seq(
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, 
"ignored"), null)
     ))
-    // blockManagerMaster.removeExecutor is not called again
+    // blockManagerMaster.removeExecutorAsync is not called again
     // but shuffle files are unregistered
-    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
     verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
 
     // Shuffle files for hostA-exec should be lost
@@ -1010,7 +1011,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     complete(taskSets(1), Seq(
       (Success, 42),
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, 
"ignored"), null)))
-    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
@@ -1050,7 +1051,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
         case _ => false
       }
       runEvent(ExecutorLost("hostA-exec", event))
-      verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+      verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
       if (expectFileLoss) {
         if (expectHostFileLoss) {
           verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
@@ -1085,7 +1086,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     submit(reduceRdd, Array(0))
     completeShuffleMapStageSuccessfully(0, 0, 1)
     runEvent(ExecutorLost("hostA-exec", event))
-    verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+    verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
     verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
@@ -2187,7 +2188,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     // fail the third stage because hostA went down
     completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
     // TODO assert this:
-    // blockManagerMaster.removeExecutor("hostA-exec")
+    // blockManagerMaster.removeExecutorAsync("hostA-exec")
     // have DAGScheduler try again
     scheduler.resubmitFailedStages()
     complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
@@ -2213,7 +2214,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     // pretend stage 2 failed because hostA went down
     completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
     // TODO assert this:
-    // blockManagerMaster.removeExecutor("hostA-exec")
+    // blockManagerMaster.removeExecutorAsync("hostA-exec")
     // DAGScheduler should notice the cached copy of the second shuffle and 
try to get it rerun.
     scheduler.resubmitFailedStages()
     assertLocations(taskSets(3), Seq(Seq("hostD")))
@@ -5072,7 +5073,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       ExecutorExited(-100, false, "Container marked as failed")))
 
     // Shuffle push merger executor should not be removed and the shuffle 
files are not unregistered
-    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(blockManagerMaster, times(0))
+      .removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
     verify(mapOutputTracker,
       
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
 
@@ -5084,7 +5086,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
     // Verify that we are not removing the executor,
     // and that we are only removing the outputs on the host
-    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(blockManagerMaster, times(0))
+      .removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
     verify(blockManagerMaster, 
times(1)).removeShufflePushMergerLocation("hostA")
     verify(mapOutputTracker,
       times(1)).removeOutputsOnHost("hostA")
@@ -5472,6 +5475,67 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-53564: RpcTimeout while submitting stage should not fail the 
job/application") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    shuffleMapRdd.cache()
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(1))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    val callCount = new AtomicInteger(0)
+    doAnswer(invocation => {
+      if (callCount.incrementAndGet() == 1) {
+        // First call for finding missing parents, throw RpcTimeoutException
+        throw new RpcTimeoutException("RpcTimeout", null)
+      } else {
+        invocation.callRealMethod()
+      }
+    }).when(blockManagerMaster)
+      .getLocations(any(classOf[Array[BlockId]]))
+
+    submit(reduceRdd, Array(0))
+    assert(scheduler.stageIdToStage.size === 2)
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42))
+    assertDataStructuresEmpty()
+    verify(blockManagerMaster, times(2))
+      .getLocations(any(classOf[Array[BlockId]]))
+  }
+
+  test("SPARK-53564: Resubmit missing parent when failed to get cache 
locations") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
+    val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+    shuffleMapRdd1.cache()
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(1))
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep1), tracker = 
mapOutputTracker)
+
+    // Cache for shuffleMapRdd1 are available
+    cacheLocations(shuffleMapRdd1.id -> 0) = Seq(makeBlockManagerId("hostA"))
+    cacheLocations(shuffleMapRdd1.id -> 1) = Seq(makeBlockManagerId("hostB"))
+
+    val callCount = new AtomicInteger(0)
+    doAnswer(invocation => {
+      if (callCount.incrementAndGet() == 1) {
+        // First call for finding missing parents, throw RpcTimeoutException
+        throw new RpcTimeoutException("RpcTimeout", null)
+      } else {
+        invocation.callRealMethod()
+      }
+    }).when(blockManagerMaster)
+      .getLocations(any(classOf[Array[BlockId]]))
+
+    submit(reduceRdd, Array(0))
+    // All 3 stages should be submitted even though caches are available for 
shuffleMapRdd1
+    // but failed to get cache locations.
+    assert(scheduler.stageIdToStage.size === 3)
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    completeShuffleMapStageSuccessfully(1, 0, 2)
+    completeAndCheckAnswer(taskSets(2), Seq((Success, 42)), Map(0 -> 42))
+    assertDataStructuresEmpty()
+    verify(blockManagerMaster, times(2))
+      .getLocations(any(classOf[Array[BlockId]]))
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to