Repository: spark
Updated Branches:
  refs/heads/master 21c7d3c31 -> 729ce3703


[SPARK-19263] DAGScheduler should avoid sending conflicting task set.

In current `DAGScheduler handleTaskCompletion` code, when event.reason is 
`Success`, it will first do `stage.pendingPartitions -= task.partitionId`, 
which maybe a bug when `FetchFailed` happens.

**Think about below**

1.  Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It 
has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are 
launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the 
driver. The driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and 
then once it completes, resubmits stage 1 with ShuffleMapTask1x and 
ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes 
on executorA and sends Success back to driver. This causes 
DAGScheduler::handleTaskCompletion to remove partition 2 from 
stage.pendingPartitions (line 1149), but it does not add the partition to the 
set of output locations (line 1192), because the task’s epoch is less than 
the failure epoch for the executor (because of the earlier failure on executor 
A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to 
remove partition 1 from stage.pendingPartitions. Combined with the previous 
step, this means that there are no more pending partitions for the stage, so 
the DAGScheduler marks the stage as finished (line 1196). However, the shuffle 
stage is not available (line 1215) because the completion for ShuffleMapTask2 
was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is 
called for the re-submitted stage, it throws an error, because there’s an 
existing active task set

**In this fix**

If a task completion is from a previous stage attempt and the epoch is too low
(i.e., it was from a failed executor), don't remove the corresponding partition
from pendingPartitions.

Author: jinxing <[email protected]>
Author: jinxing <[email protected]>

Closes #16620 from jinxing64/SPARK-19263.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/729ce370
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/729ce370
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/729ce370

Branch: refs/heads/master
Commit: 729ce3703257aa34c00c5c8253e6971faf6a0c8d
Parents: 21c7d3c
Author: jinxing <[email protected]>
Authored: Sat Feb 18 10:49:40 2017 -0400
Committer: Kay Ousterhout <[email protected]>
Committed: Sat Feb 18 10:55:18 2017 -0400

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 22 +++++-
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 70 ++++++++++++++++++++
 3 files changed, 91 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 69101ac..0b7d371 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1181,15 +1181,33 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-            shuffleStage.pendingPartitions -= task.partitionId
             updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
+            if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {
+              // This task was for the currently running attempt of the stage. 
Since the task
+              // completed successfully from the perspective of the 
TaskSetManager, mark it as
+              // no longer pending (the TaskSetManager may consider the task 
complete even
+              // when the output needs to be ignored because the task's epoch 
is too small below.
+              // In this case, when pending partitions is empty, there will 
still be missing
+              // output locations, which will cause the DAGScheduler to 
resubmit the stage below.)
+              shuffleStage.pendingPartitions -= task.partitionId
+            }
             if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")
             } else {
+              // The epoch of the task is acceptable (i.e., the task was 
launched after the most
+              // recent failure we're aware of for the executor), so mark the 
task's output as
+              // available.
               shuffleStage.addOutputLoc(smt.partitionId, status)
+              // Remove the task's partition from pending partitions. This may 
have already been
+              // done above, but will not have been done yet in cases where 
the task attempt was
+              // from an earlier attempt of the stage (i.e., not the attempt 
that's currently
+              // running).  This allows the DAGScheduler to mark the stage as 
complete when one
+              // copy of each task has finished successfully, even if the 
currently active stage
+              // still has tasks running.
+              shuffleStage.pendingPartitions -= task.partitionId
             }
 
             if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
@@ -1213,7 +1231,7 @@ class DAGScheduler(
               clearCacheLocs()
 
               if (!shuffleStage.isAvailable) {
-                // Some tasks had failed; let's resubmit this shuffleStage
+                // Some tasks had failed; let's resubmit this shuffleStage.
                 // TODO: Lower-level scheduler should also deal with this
                 logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
                   ") because some of its tasks had failed: " +

http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c6fc038..32e5df6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
   val details: String = callSite.longForm
 
   /**
-   * Pointer to the [StageInfo] object for the most recent attempt. This needs 
to be initialized
+   * Pointer to the [[StageInfo]] object for the most recent attempt. This 
needs to be initialized
    * here, before any attempts have actually been created, because the 
DAGScheduler uses this
    * StageInfo to tell SparkListeners when a job starts (which happens before 
any stage attempts
    * have been created).

http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4e5f267..c735220 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     }
   }
 
+  test("[SPARK-19263] DAGScheduler should not submit multiple active 
tasksets," +
+      " even with late completions from earlier stage attempts") {
+    // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB 
<--- rddC
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    // Complete both tasks in rddA.
+    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostA", 2))))
+
+    // Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) 
running on hostA
+    // and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
+    assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // Both original tasks in rddA should be marked as failed, because they 
ran on the
+    // failed hostA, so both should be resubmitted. Complete them on hostB 
successfully.
+    scheduler.resubmitFailedStages()
+    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+      && taskSets(2).tasks.size === 2)
+    complete(taskSets(2), Seq(
+      (Success, makeMapStatus("hostB", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+
+    // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on 
failed hostA
+    // successfully. The success should be ignored because the task started 
before the
+    // executor failed, so the output may have been lost.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
+
+    // Both tasks in rddB should be resubmitted, because none of them has 
succeeded truely.
+    // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) 
successfully.
+    // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active 
stage attempt
+    // is still running.
+    assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
+      && taskSets(3).tasks.size === 2)
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+    // There should be no new attempt of stage submitted,
+    // because task(stageId=1, stageAttempt=1, partitionId=1) is still running 
in
+    // the current attempt (and hasn't completed successfully in any earlier 
attempts).
+    assert(taskSets.size === 4)
+
+    // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+    // Now the ResultStage should be submitted, because all of the tasks of 
rddB have
+    // completed successfully on alive executors.
+    assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+    complete(taskSets(4), Seq(
+      (Success, 1),
+      (Success, 1)))
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to