This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1a6512205ea0 [SPARK-54556][CORE] Rollback succeeding shuffle map 
stages when shuffle checksum mismatch detected
1a6512205ea0 is described below

commit 1a6512205ea0dea26704b3aabce9a8d51a68e6a7
Author: Tengfei Huang <[email protected]>
AuthorDate: Sat Dec 20 00:03:08 2025 +0800

    [SPARK-54556][CORE] Rollback succeeding shuffle map stages when shuffle 
checksum mismatch detected
    
    ### What changes were proposed in this pull request?
    Rollback shuffle map stages when shuffle checksum mismatch detected:
    
    - cancel and resubmit the stage if it's running;
    - clean up the shuffle status to ensure it'll be resubmitted;
    - mark rollback attemptId and ignore the results from these elder attempts 
which may consume inconsistent data;
    
    ### Why are the changes needed?
    To ensure all the succeeding stages will be re-submitted and fully-retry 
when there is shuffle checksum mismatch detected.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53274 from ivoson/SPARK-54556.
    
    Authored-by: Tengfei Huang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 0da9e0505b59d145c6429a3113409dc33f9a1efc)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 221 ++++++++++++++++-----
 .../scala/org/apache/spark/scheduler/Stage.scala   |  16 ++
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 130 +++++++++++-
 3 files changed, 313 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7c8bea31334b..69e766ebcef2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1560,42 +1560,27 @@ private[spark] class DAGScheduler(
     // `findMissingPartitions()` returns all partitions every time.
     stage match {
       case sms: ShuffleMapStage if !sms.isAvailable =>
-        val needFullStageRetry = if 
(sms.shuffleDep.checksumMismatchFullRetryEnabled) {
-          // When the parents of this stage are indeterminate (e.g., some 
parents are not
-          // checkpointed and checksum mismatches are detected), the output 
data of the parents
-          // may have changed due to task retries. For correctness reason, we 
need to
-          // retry all tasks of the current stage. The legacy way of using 
current stage's
-          // deterministic level to trigger full stage retry is not accurate.
-          stage.isParentIndeterminate
-        } else {
-          if (stage.isIndeterminate) {
-            // already executed at least once
-            if (sms.getNextAttemptId > 0) {
-              // While we previously validated possible rollbacks during the 
handling of a FetchFailure,
-              // where we were fetching from an indeterminate source map 
stages, this later check
-              // covers additional cases like recalculating an indeterminate 
stage after an executor
-              // loss. Moreover, because this check occurs later in the 
process, if a result stage task
-              // has successfully completed, we can detect this and abort the 
job, as rolling back a
-              // result stage is not possible.
-              val stagesToRollback = collectSucceedingStages(sms)
-              abortStageWithInvalidRollBack(stagesToRollback)
-              // stages which cannot be rolled back were aborted which leads 
to removing the
-              // the dependant job(s) from the active jobs set
-              val numActiveJobsWithStageAfterRollback =
-                activeJobs.count(job => 
stagesToRollback.contains(job.finalStage))
-              if (numActiveJobsWithStageAfterRollback == 0) {
-                logInfo(log"All jobs depending on the indeterminate stage " +
-                  log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage 
is not needed anymore.")
-                return
-              }
+        if (!sms.shuffleDep.checksumMismatchFullRetryEnabled && 
stage.isIndeterminate) {
+          // already executed at least once
+          if (sms.getNextAttemptId > 0) {
+            // While we previously validated possible rollbacks during the 
handling of a FetchFailure,
+            // where we were fetching from an indeterminate source map stages, 
this later check
+            // covers additional cases like recalculating an indeterminate 
stage after an executor
+            // loss. Moreover, because this check occurs later in the process, 
if a result stage task
+            // has successfully completed, we can detect this and abort the 
job, as rolling back a
+            // result stage is not possible.
+            val stagesToRollback = collectSucceedingStages(sms)
+            filterAndAbortUnrollbackableStages(stagesToRollback)
+            // stages which cannot be rolled back were aborted which leads to 
removing the
+            // the dependant job(s) from the active jobs set
+            val numActiveJobsWithStageAfterRollback =
+              activeJobs.count(job => 
stagesToRollback.contains(job.finalStage))
+            if (numActiveJobsWithStageAfterRollback == 0) {
+              logInfo(log"All jobs depending on the indeterminate stage " +
+                log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is 
not needed anymore.")
+              return
             }
-            true
-          } else {
-            false
           }
-        }
-
-        if (needFullStageRetry) {
           
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
           sms.shuffleDep.newShuffleMergeState()
         }
@@ -1913,16 +1898,127 @@ private[spark] class DAGScheduler(
 
   /**
    * If a map stage is non-deterministic, the map tasks of the stage may 
return different result
-   * when re-try. To make sure data correctness, we need to re-try all the 
tasks of its succeeding
-   * stages, as the input data may be changed after the map tasks are 
re-tried. For stages where
-   * rollback and retry all tasks are not possible, we will need to abort the 
stages.
+   * when re-try. To make sure data correctness, we need to clean up shuffles 
to make sure succeeding
+   * stages will be resubmitted and re-try all the tasks, as the input data 
may be changed after
+   * the map tasks are re-tried. For stages where rollback and retry all tasks 
are not possible,
+   * we will need to abort the stages.
+   */
+  private[scheduler] def rollbackSucceedingStages(mapStage: ShuffleMapStage): 
Unit = {
+    val stagesToRollback = collectSucceedingStages(mapStage).filterNot(_ == 
mapStage)
+    val stagesCanRollback = 
filterAndAbortUnrollbackableStages(stagesToRollback)
+    // stages which cannot be rolled back were aborted which leads to removing 
the
+    // the dependant job(s) from the active jobs set, there could be no active 
jobs
+    // left depending on the indeterminate stage and hence no need to roll 
back any stages.
+    val numActiveJobsWithStageAfterRollback =
+      activeJobs.count(job => stagesToRollback.contains(job.finalStage))
+    if (numActiveJobsWithStageAfterRollback == 0) {
+      logInfo(log"All jobs depending on the indeterminate stage " +
+        log"(${MDC(STAGE_ID, mapStage.id)}) were aborted.")
+    } else {
+      // Mark rollback attempt to identify elder attempts which could consume 
inconsistent data,
+      // the results from these attempts should be ignored.
+      // Rollback the running stages first to avoid triggering more fetch 
failures.
+      stagesToRollback.toSeq.sortBy(!runningStages.contains(_)).foreach {
+        case sms: ShuffleMapStage =>
+          rollbackShuffleMapStage(sms, "rolling back due to indeterminate " +
+            s"output of shuffle map stage $mapStage")
+          sms.markAsRollingBack()
+
+        case rs: ResultStage =>
+          rs.markAsRollingBack()
+      }
+
+      logInfo(log"The shuffle map stage ${MDC(STAGE, mapStage)} with 
indeterminate output " +
+        log"was retried, we will roll back and rerun its succeeding " +
+        log"stages: ${MDC(STAGES, stagesCanRollback)}")
+    }
+  }
+
+  /**
+   * Roll back the given shuffle map stage:
+   * 1. If the stage is running, cancel the stage and kill all running tasks. 
Clean up the shuffle
+   *    output resubmit it if it's not exceeded max retries.
+   * 2. If the stage is not running but having output generated, clean up the 
shuffle output to
+   *    ensure the stage will be re-executed with fully retry.
+   *
+   * @param sms the shuffle map stage to roll back
+   * @param reason the reason for rolling back
+   */
+  private def rollbackShuffleMapStage(sms: ShuffleMapStage, reason: String): 
Unit = {
+    logInfo(log"Rolling back ${MDC(STAGE, sms)} due to indeterminate rollback")
+    val clearShuffle = if (runningStages.contains(sms)) {
+      logInfo(log"Stage ${MDC(STAGE, sms)} is running, marking it as failed 
and " +
+        log"resubmit if allowed")
+      cancelStageAndTryResubmit(sms, reason)
+    } else {
+      true
+    }
+
+    // Clean up shuffle outputs in case the stage is not aborted to ensure the 
stage
+    // will be re-executed.
+    if (clearShuffle) {
+      logInfo(log"Cleaning up shuffle for stage ${MDC(STAGE, sms)} to ensure 
re-execution")
+      mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
+      sms.shuffleDep.newShuffleMergeState()
+    }
+  }
+
+  /**
+   * Cancel the give running shuffle map stage, killing all running tasks, 
resubmit if it doesn't
+   * exceed max retries.
+   *
+   * @param stage the stage to cancel and resubmit
+   * @param reason the reason for the operation
+   * @return true if the stage is successfully cancelled and resubmitted, 
otherwise false
    */
-  private[scheduler] def abortUnrollbackableStages(mapStage: ShuffleMapStage): 
Unit = {
-    val stagesToRollback = collectSucceedingStages(mapStage)
-    val rollingBackStages = abortStageWithInvalidRollBack(stagesToRollback)
-    logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with 
indeterminate output " +
-      log"was failed, we will roll back and rerun below stages which include 
itself and all its " +
-      log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
+  private def cancelStageAndTryResubmit(stage: ShuffleMapStage, reason: 
String): Boolean = {
+    assert(runningStages.contains(stage), "stage must be running to be 
cancelled and resubmitted")
+    try {
+      // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
+      val job = jobIdToActiveJob.get(stage.firstJobId)
+      val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
+      taskScheduler.killAllTaskAttempts(stage.id, shouldInterrupt, reason)
+    } catch {
+      case e: UnsupportedOperationException =>
+        logWarning(log"Could not kill all tasks for stage ${MDC(STAGE_ID, 
stage.id)}", e)
+        abortStage(stage, "Rollback failed due to: Not able to kill running 
tasks for stage " +
+          s"$stage (${stage.name})", Some(e))
+        return false
+    }
+
+    stage.failedAttemptIds.add(stage.latestInfo.attemptNumber())
+    val shouldAbortStage = stage.failedAttemptIds.size >= 
maxConsecutiveStageAttempts ||
+      disallowStageRetryForTest
+    markStageAsFinished(stage, Some(reason), willRetry = !shouldAbortStage)
+
+    if (shouldAbortStage) {
+      val abortMessage = if (disallowStageRetryForTest) {
+        "Stage will not retry stage due to testing config. Most recent failure 
" +
+          s"reason: $reason"
+      } else {
+        s"$stage (${stage.name}) has failed the maximum allowable number of " +
+          s"times: $maxConsecutiveStageAttempts. Most recent failure reason: 
$reason"
+      }
+      abortStage(stage, s"rollback failed due to: $abortMessage", None)
+    } else {
+      // In case multiple task failures triggered for a single stage attempt, 
ensure we only
+      // resubmit the failed stage once.
+      val noResubmitEnqueued = !failedStages.contains(stage)
+      failedStages += stage
+      if (noResubmitEnqueued) {
+        logInfo(log"Resubmitting ${MDC(FAILED_STAGE, stage)} " +
+          log"(${MDC(FAILED_STAGE_NAME, stage.name)}) due to rollback.")
+        messageScheduler.schedule(
+          new Runnable {
+            override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+          },
+          DAGScheduler.RESUBMIT_TIMEOUT,
+          TimeUnit.MILLISECONDS
+        )
+      }
+    }
+
+    !shouldAbortStage
   }
 
   /**
@@ -1990,7 +2086,21 @@ private[spark] class DAGScheduler(
         // tasks complete, they still count and we can mark the corresponding 
partitions as
         // finished if the stage is determinate. Here we notify the task 
scheduler to skip running
         // tasks for the same partition to save resource.
-        if (!stage.isIndeterminate && task.stageAttemptId < 
stage.latestInfo.attemptNumber()) {
+        def stageWithChecksumMismatchFullRetryEnabled(stage: Stage): Boolean = 
{
+          stage match {
+            case s: ShuffleMapStage => 
s.shuffleDep.checksumMismatchFullRetryEnabled
+            case _ => 
stage.parents.exists(stageWithChecksumMismatchFullRetryEnabled)
+          }
+        }
+
+        // Ignore task completion for old attempt of indeterminate stage
+        val ignoreOldTaskAttempts = if 
(stageWithChecksumMismatchFullRetryEnabled(stage)) {
+          stage.maxAttemptIdToIgnore.exists(_ >= task.stageAttemptId)
+        } else {
+          stage.isIndeterminate && task.stageAttemptId < 
stage.latestInfo.attemptNumber()
+        }
+
+        if (!ignoreOldTaskAttempts && task.stageAttemptId < 
stage.latestInfo.attemptNumber()) {
           taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
         }
 
@@ -2002,6 +2112,13 @@ private[spark] class DAGScheduler(
             resultStage.activeJob match {
               case Some(job) =>
                 if (!job.finished(rt.outputId)) {
+                  if (ignoreOldTaskAttempts) {
+                    val reason = "Task with indeterminate results from old 
attempt succeeded, " +
+                      s"aborting the stage $resultStage to ensure data 
correctness."
+                    abortStage(resultStage, reason, None)
+                    return
+                  }
+
                   job.finished(rt.outputId) = true
                   job.numFinished += 1
                   // If the whole job has finished, remove it
@@ -2045,10 +2162,7 @@ private[spark] class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-            // Ignore task completion for old attempt of indeterminate stage
-            val ignoreIndeterminate = stage.isIndeterminate &&
-              task.stageAttemptId < stage.latestInfo.attemptNumber()
-            if (!ignoreIndeterminate) {
+            if (!ignoreOldTaskAttempts) {
               shuffleStage.pendingPartitions -= task.partitionId
               val status = event.result.asInstanceOf[MapStatus]
               val execId = status.location.executorId
@@ -2077,7 +2191,7 @@ private[spark] class DAGScheduler(
                     shuffleStage.maxChecksumMismatchedId = smt.stageAttemptId
                     if 
(shuffleStage.shuffleDep.checksumMismatchFullRetryEnabled
                       && shuffleStage.isStageIndeterminate) {
-                      abortUnrollbackableStages(shuffleStage)
+                      rollbackSucceedingStages(shuffleStage)
                     }
                   }
                 }
@@ -2206,7 +2320,11 @@ private[spark] class DAGScheduler(
               // guaranteed to be determinate, so the input data of the 
reducers will not change
               // even if the map tasks are re-tried.
               if (mapStage.isIndeterminate && 
!mapStage.shuffleDep.checksumMismatchFullRetryEnabled) {
-                abortUnrollbackableStages(mapStage)
+                val stagesToRollback = collectSucceedingStages(mapStage)
+                val stagesCanRollback = 
filterAndAbortUnrollbackableStages(stagesToRollback)
+                logInfo(log"The shuffle map stage ${MDC(STAGE, mapStage)} with 
indeterminate output " +
+                  log"was failed, we will roll back and rerun below stages 
which include itself and all " +
+                  log"its indeterminate child stages: ${MDC(STAGES, 
stagesCanRollback)}")
               }
 
               // We expect one executor failure to trigger many FetchFailures 
in rapid succession,
@@ -2396,7 +2514,8 @@ private[spark] class DAGScheduler(
    * @param stagesToRollback stages to roll back
    * @return Shuffle map stages which need and can be rolled back
    */
-  private def abortStageWithInvalidRollBack(stagesToRollback: HashSet[Stage]): 
HashSet[Stage] = {
+  private def filterAndAbortUnrollbackableStages(
+      stagesToRollback: HashSet[Stage]): HashSet[Stage] = {
 
     def generateErrorMessage(stage: Stage): String = {
       "A shuffle map stage with indeterminate output was failed and retried. " 
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 9bf604e9a83c..d8aaea013ee6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -84,6 +84,14 @@ private[scheduler] abstract class Stage(
    */
   private[scheduler] var maxChecksumMismatchedId: Int = nextAttemptId
 
+  /**
+   * The max attempt id we should ignore results for this stage, indicating 
there are ancestor
+   * stages having been detected with checksum mismatches. This stage is 
probably also
+   * indeterminate, so we need to avoid completing the stage and the job with 
incorrect result
+   * by ignoring the task output from previous attempts which might consume 
inconsistent data
+   */
+  private[scheduler] var maxAttemptIdToIgnore: Option[Int] = None
+
   val name: String = callSite.shortForm
   val details: String = callSite.longForm
 
@@ -108,6 +116,14 @@ private[scheduler] abstract class Stage(
     failedAttemptIds.clear()
   }
 
+  /** Mark the latest attempt as rollback */
+  private[scheduler] def markAsRollingBack(): Unit = {
+    // Only if the stage has been submitted
+    if (getNextAttemptId > 0) {
+      maxAttemptIdToIgnore = Some(latestInfo.attemptNumber())
+    }
+  }
+
   /** Creates a new attempt for this stage by creating a new StageInfo with a 
new attempt ID. */
   def makeNewStageAttempt(
       numPartitionsToCompute: Int,
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6ec0ea320eaa..48f1c49e7af2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3421,11 +3421,12 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       stageId: Int,
       shuffleId: Int,
       numTasks: Int = 2,
-      checksumVal: Long = 0): Unit = {
+      checksumVal: Long = 0,
+      stageAttemptId: Int = 1): Unit = {
     assert(taskSets(taskSetIndex).stageId == stageId)
-    assert(taskSets(taskSetIndex).stageAttemptId == 1)
+    assert(taskSets(taskSetIndex).stageAttemptId == stageAttemptId)
     assert(taskSets(taskSetIndex).tasks.length == numTasks)
-    completeShuffleMapStageSuccessfully(stageId, 1, 2, checksumVal = 
checksumVal)
+    completeShuffleMapStageSuccessfully(stageId, stageAttemptId, 2, 
checksumVal = checksumVal)
     assert(mapOutputTracker.findMissingPartitions(shuffleId) === 
Some(Seq.empty))
   }
 
@@ -3835,6 +3836,129 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-54556: ensure rollback all the succeeding stages and ignore 
stale task results " +
+    "when shuffle checksum mismatch detected") {
+    /**
+     * Construct the following RDD graph:
+     *
+     *        ShuffleMapRdd1 (Indeterminate)
+     *            /      \
+     *    ShuffleMapRdd2  \
+     *          /          |
+     *    ShuffleMapRdd3  |
+     *           \       |
+     *           FinalRd
+     *
+     * While executing the result stage, shuffle fetch failed on shuffle1 and 
leading to executor
+     * loss and some map output of shuffle2 lost.
+     * Both stage 0 and stage 2 will be submitted.
+     * Checksum mismatch is detected when retrying stage 0.
+     * Retry task of stage 2 completed and should be ignored.
+     */
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
+    val shuffleDep1 = new ShuffleDependency(
+      shuffleMapRdd1,
+      new HashPartitioner(2),
+      checksumMismatchFullRetryEnabled = true)
+    val shuffleId1 = shuffleDep1.shuffleId
+
+    val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = 
mapOutputTracker)
+    val shuffleDep2 = new ShuffleDependency(
+      shuffleMapRdd2,
+      new HashPartitioner(2),
+      checksumMismatchFullRetryEnabled = true)
+    val shuffleId2 = shuffleDep2.shuffleId
+
+    val shuffleMapRdd3 = new MyRDD(sc, 2, List(shuffleDep2), tracker = 
mapOutputTracker)
+    val shuffleDep3 = new ShuffleDependency(
+      shuffleMapRdd3,
+      new HashPartitioner(2),
+      checksumMismatchFullRetryEnabled = true)
+    val shuffleId3 = shuffleDep3.shuffleId
+
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep3), tracker = 
mapOutputTracker)
+
+    // Submit the job and complete the shuffle stages
+    submit(finalRdd, Array(0, 1))
+    completeShuffleMapStageSuccessfully(
+      0, 0, 2, Seq("hostA", "hostB"), checksumVal = 100)
+    completeShuffleMapStageSuccessfully(
+      1, 0, 2, Seq("hostC", "hostD"), checksumVal = 200)
+    completeShuffleMapStageSuccessfully(
+      2, 0, 2, Seq("hostB", "hostC"), checksumVal = 300)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId3) === 
Some(Seq.empty))
+
+    // The first task of result stage 3 failed with FetchFailed.
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0),
+      FetchFailed(makeBlockManagerId("hostB"), shuffleId1, 0L, 0, 0, 
"ignored"),
+      null))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId3).nonEmpty)
+
+    // Check status for all failedStages.
+    val failedStages = scheduler.failedStages.toSeq
+    assert(failedStages.map(_.id) === Seq(0, 3))
+    scheduler.resubmitFailedStages()
+    // Check status for runningStages.
+    assert(scheduler.runningStages.map(_.id) === Set(0, 2))
+
+    // Complete the re-attempt of shuffle map stage 0(shuffleId1) with a 
different checksum.
+    completeShuffleMapStageSuccessfully(0, 1, 2, checksumVal = 101)
+    completeShuffleMapStageSuccessfully(2, 1, 2, checksumVal = 300)
+    // The result of stage 2 should be ignored
+    assert(mapOutputTracker.getNumAvailableOutputs(shuffleId3) === 0)
+    scheduler.resubmitFailedStages()
+    assert(scheduler.runningStages.map(_.id) === Set(1))
+
+    checkAndCompleteRetryStage(6, 1, shuffleId2, 2, checksumVal = 201)
+    checkAndCompleteRetryStage(7, 2, shuffleId3, 2, checksumVal = 301, 
stageAttemptId = 2)
+    completeAndCheckAnswer(taskSets(8), Seq((Success, 11), (Success, 12)), 
Map(0 -> 11, 1 -> 12))
+  }
+
+  test("SPARK-54556: abort stage if result task from old attempt with 
indeterminate " +
+    "result succeeded") {
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
+    val shuffleDep1 = new ShuffleDependency(
+      shuffleMapRdd1,
+      new HashPartitioner(2),
+      checksumMismatchFullRetryEnabled = true)
+    val shuffleId1 = shuffleDep1.shuffleId
+
+    // Submit a job depending on shuffleDep1
+    val finalRdd1 = new MyRDD(
+      sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
+    submit(finalRdd1, Array(0, 1))
+
+    // Finish stage 0.
+    completeShuffleMapStageSuccessfully(
+      0, 0, 2, Seq("hostA", "hostB"), checksumVal = 100)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+
+    // The first task of result stage failed with FetchFailed.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, 
"ignored"),
+      null))
+
+    // Check status for all failedStages.
+    val failedStages = scheduler.failedStages.toSeq
+    assert(failedStages.map(_.id) == Seq(0, 1))
+    scheduler.resubmitFailedStages()
+
+    // Complete the shuffle map stage with a different checksum
+    completeShuffleMapStageSuccessfully(0, 1, 2, checksumVal = 101)
+
+    // Complete the second task of 1st attempt of result stage.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1),
+      Success,
+      42))
+    assert(failure != null && failure.getMessage.contains(
+      "Task with indeterminate results from old attempt succeeded"))
+  }
+
   test("SPARK-27164: RDD.countApprox on empty RDDs schedules jobs which never 
complete") {
     val latch = new CountDownLatch(1)
     val jobListener = new SparkListener {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to