This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 26ba0ed11eda [SPARK-53564][CORE] Avoid DAGScheduler exits due to
blockManager RPC timeout in DAGSchedulerEventProcessLoop
26ba0ed11eda is described below
commit 26ba0ed11edaa2df0086d1fe73803051feda4fae
Author: Tengfei Huang <[email protected]>
AuthorDate: Thu Oct 9 19:19:22 2025 +0800
[SPARK-53564][CORE] Avoid DAGScheduler exits due to blockManager RPC
timeout in DAGSchedulerEventProcessLoop
### What changes were proposed in this pull request?
Currently there are a few blocking RPC requests used in
`DAGSchedulerEventProcessLoop`:
1.
[blockManagerMaster.getLocations(blockIds)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L451)
used in `getCacheLocs` to get rdd cache locations.
2.
[blockManagerMaster.removeShufflePushMergerLocation](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2754)
used in `removeExecutorAndUnregisterOutputs`.
3.
[blockManagerMaster.removeExecutor(execId)](https://github.com/apache/spark/blob/fbdad297f54200b686f437a6c25fd1c387d1aaa0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2765)
used in `removeExecutorAndUnregisterOutputs`.
`RpcTimeoutException` could be thrown if there are slow events blocking the
`BlockManagerMasterEndpoint`, and the exception is not handled in the
`DAGSchedulerEventProcessLoop`. Once this happens, the DAGScheduler will exit.
This PR proposes to catch and handle the `RpcTimeoutException` properly
instead of crashing the application. There are 2 scenarios:
1. Change the requests in `removeExecutorAndUnregisterOutputs` to be async
since we don't rely on the response, and let `BlockManagerMasterEndpoint` to
deal with the potential errors.
2. Abort the stage if rpc timeout happens while `submitStage`.
### Why are the changes needed?
Avoid rpc timeout crashing spark application.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52335 from ivoson/SPARK-53564.
Authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/rpc/RpcTimeout.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 14 +++-
.../apache/spark/storage/BlockManagerMaster.scala | 5 +-
.../spark/storage/BlockManagerMasterEndpoint.scala | 9 ++-
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 88 +++++++++++++++++++---
5 files changed, 99 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 770ae2f1dd22..da23f2abd3ac 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
/**
* An exception thrown if RpcTimeout modifies a `TimeoutException`.
*/
-private[rpc] class RpcTimeoutException(message: String, cause:
TimeoutException)
+private[spark] class RpcTimeoutException(message: String, cause:
TimeoutException)
extends TimeoutException(message) { initCause(cause) }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3b719a2c7d24..7d77628c3f08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -49,6 +49,7 @@ import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID,
EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
import org.apache.spark.rpc.RpcTimeout
+import org.apache.spark.rpc.RpcTimeoutException
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
@@ -771,7 +772,14 @@ private[spark] class DAGScheduler(
def visit(rdd: RDD[_]): Unit = {
if (!visited(rdd)) {
visited += rdd
- val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
+ val rddHasUncachedPartitions = try {
+ getCacheLocs(rdd).contains(Nil)
+ } catch {
+ case e: RpcTimeoutException =>
+ logWarning(log"Failed to get cache locations for RDD ${MDC(RDD_ID,
rdd.id)} due " +
+ log"to rpc timeout, assuming not fully cached.", e)
+ true
+ }
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
@@ -1463,7 +1471,7 @@ private[spark] class DAGScheduler(
abortStage(stage, reason, None)
} else {
val missing = getMissingParentStages(stage).sortBy(_.id)
- logDebug("missing: " + missing)
+ logInfo(log"Missing parents found for ${MDC(STAGE, stage)}:
${MDC(MISSING_PARENT_STAGES, missing)}")
if (missing.isEmpty) {
logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID,
stage.rdd)}), " +
log"which has no missing parents")
@@ -2807,7 +2815,7 @@ private[spark] class DAGScheduler(
hostToUnregisterOutputs.foreach(
host => blockManagerMaster.removeShufflePushMergerLocation(host))
}
- blockManagerMaster.removeExecutor(execId)
+ blockManagerMaster.removeExecutorAsync(execId)
clearCacheLocs()
}
if (fileLost) {
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 16c2cbbe7fdc..e77b4c278574 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -167,11 +167,12 @@ class BlockManagerMaster(
/**
* Remove the host from the candidate list of shuffle push mergers. This can
be
- * triggered if there is a FetchFailedException on the host
+ * triggered if there is a FetchFailedException on the host. Non-blocking.
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
- driverEndpoint.askSync[Unit](RemoveShufflePushMergerLocation(host))
+ logInfo(log"Request to remove shuffle push merger location ${MDC(HOST,
host)}")
+ driverEndpoint.ask[Unit](RemoveShufflePushMergerLocation(host))
}
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index f63d4a55669a..9d6539e09f45 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -530,7 +530,13 @@ class BlockManagerMasterEndpoint(
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm !=
candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations,
maxReplicas)
- bm.storageEndpoint.ask[Boolean](replicateMsg)
+ try {
+ bm.storageEndpoint.ask[Boolean](replicateMsg)
+ } catch {
+ case e: Exception =>
+ logWarning(log"Failed to request replication of ${MDC(BLOCK_ID,
blockId)} " +
+ log"from ${MDC(BLOCK_MANAGER_ID, candidateBMId)}", e)
+ }
}
}
}
@@ -554,6 +560,7 @@ class BlockManagerMasterEndpoint(
private def removeExecutor(execId: String): Unit = {
logInfo(log"Trying to remove executor ${MDC(EXECUTOR_ID, execId)} from
BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
+ logInfo(log"Removed ${MDC(EXECUTOR_ID, execId)} successfully in
removeExecutor")
}
/**
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c20866fda0a3..6ec0ea320eaa 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.util.{ArrayList => JArrayList, Collections => JCollections,
Properties}
import java.util.concurrent.{CountDownLatch, Delayed, LinkedBlockingQueue,
ScheduledFuture, TimeUnit}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong,
AtomicReference}
import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -44,6 +44,7 @@ import
org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+import org.apache.spark.rpc.RpcTimeoutException
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.{FetchFailedException,
MetadataFetchFailedException}
@@ -327,7 +328,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
}.getOrElse(Seq())
}.toIndexedSeq
}
- override def removeExecutor(execId: String): Unit = {
+ override def removeExecutorAsync(execId: String): Unit = {
// don't need to propagate to the driver, which we don't have
}
@@ -751,7 +752,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
// Now the executor on hostA is lost
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container
marked as failed")))
// Executor is removed but shuffle files are not unregistered
- verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
// The MapOutputTracker has all the shuffle files
@@ -764,9 +765,9 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0,
"ignored"), null)
))
- // blockManagerMaster.removeExecutor is not called again
+ // blockManagerMaster.removeExecutorAsync is not called again
// but shuffle files are unregistered
- verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
// Shuffle files for hostA-exec should be lost
@@ -1010,7 +1011,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0,
"ignored"), null)))
- verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
@@ -1050,7 +1051,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
case _ => false
}
runEvent(ExecutorLost("hostA-exec", event))
- verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
if (expectFileLoss) {
if (expectHostFileLoss) {
verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
@@ -1085,7 +1086,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
submit(reduceRdd, Array(0))
completeShuffleMapStageSuccessfully(0, 0, 1)
runEvent(ExecutorLost("hostA-exec", event))
- verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(blockManagerMaster, times(1)).removeExecutorAsync("hostA-exec")
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId,
0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
@@ -2187,7 +2188,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
// fail the third stage because hostA went down
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
// TODO assert this:
- // blockManagerMaster.removeExecutor("hostA-exec")
+ // blockManagerMaster.removeExecutorAsync("hostA-exec")
// have DAGScheduler try again
scheduler.resubmitFailedStages()
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
@@ -2213,7 +2214,7 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
// pretend stage 2 failed because hostA went down
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
// TODO assert this:
- // blockManagerMaster.removeExecutor("hostA-exec")
+ // blockManagerMaster.removeExecutorAsync("hostA-exec")
// DAGScheduler should notice the cached copy of the second shuffle and
try to get it rerun.
scheduler.resubmitFailedStages()
assertLocations(taskSets(3), Seq(Seq("hostD")))
@@ -5072,7 +5073,8 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
ExecutorExited(-100, false, "Container marked as failed")))
// Shuffle push merger executor should not be removed and the shuffle
files are not unregistered
- verify(blockManagerMaster,
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+ verify(blockManagerMaster, times(0))
+ .removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
verify(mapOutputTracker,
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
@@ -5084,7 +5086,8 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
// Verify that we are not removing the executor,
// and that we are only removing the outputs on the host
- verify(blockManagerMaster,
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+ verify(blockManagerMaster, times(0))
+ .removeExecutorAsync(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
verify(blockManagerMaster,
times(1)).removeShufflePushMergerLocation("hostA")
verify(mapOutputTracker,
times(1)).removeOutputsOnHost("hostA")
@@ -5472,6 +5475,67 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
}
}
+ test("SPARK-53564: RpcTimeout while submitting stage should not fail the
job/application") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ shuffleMapRdd.cache()
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(1))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker =
mapOutputTracker)
+
+ val callCount = new AtomicInteger(0)
+ doAnswer(invocation => {
+ if (callCount.incrementAndGet() == 1) {
+ // First call for finding missing parents, throw RpcTimeoutException
+ throw new RpcTimeoutException("RpcTimeout", null)
+ } else {
+ invocation.callRealMethod()
+ }
+ }).when(blockManagerMaster)
+ .getLocations(any(classOf[Array[BlockId]]))
+
+ submit(reduceRdd, Array(0))
+ assert(scheduler.stageIdToStage.size === 2)
+ completeShuffleMapStageSuccessfully(0, 0, 2)
+ completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42))
+ assertDataStructuresEmpty()
+ verify(blockManagerMaster, times(2))
+ .getLocations(any(classOf[Array[BlockId]]))
+ }
+
+ test("SPARK-53564: Resubmit missing parent when failed to get cache
locations") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(2))
+ val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep), tracker =
mapOutputTracker)
+ shuffleMapRdd1.cache()
+ val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new
HashPartitioner(1))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep1), tracker =
mapOutputTracker)
+
+ // Cache for shuffleMapRdd1 are available
+ cacheLocations(shuffleMapRdd1.id -> 0) = Seq(makeBlockManagerId("hostA"))
+ cacheLocations(shuffleMapRdd1.id -> 1) = Seq(makeBlockManagerId("hostB"))
+
+ val callCount = new AtomicInteger(0)
+ doAnswer(invocation => {
+ if (callCount.incrementAndGet() == 1) {
+ // First call for finding missing parents, throw RpcTimeoutException
+ throw new RpcTimeoutException("RpcTimeout", null)
+ } else {
+ invocation.callRealMethod()
+ }
+ }).when(blockManagerMaster)
+ .getLocations(any(classOf[Array[BlockId]]))
+
+ submit(reduceRdd, Array(0))
+ // All 3 stages should be submitted even though caches are available for
shuffleMapRdd1
+ // but failed to get cache locations.
+ assert(scheduler.stageIdToStage.size === 3)
+ completeShuffleMapStageSuccessfully(0, 0, 2)
+ completeShuffleMapStageSuccessfully(1, 0, 2)
+ completeAndCheckAnswer(taskSets(2), Seq((Success, 42)), Map(0 -> 42))
+ assertDataStructuresEmpty()
+ verify(blockManagerMaster, times(2))
+ .getLocations(any(classOf[Array[BlockId]]))
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its
preferred locations.
* Note that this checks only the host and not the executor ID.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]