Repository: spark
Updated Branches:
  refs/heads/master e70aff6c2 -> bf719056b


[SPARK-3224] FetchFailed reduce stages should only show up once in failed 
stages (in UI)

This is a HOTFIX for 1.1.

Author: Reynold Xin <[email protected]>
Author: Kay Ousterhout <[email protected]>

Closes #2127 from rxin/SPARK-3224 and squashes the following commits:

effb1ce [Reynold Xin] Move log message.
49282b3 [Reynold Xin] Kay's feedback.
3f01847 [Reynold Xin] Merge pull request #2 from kayousterhout/SPARK-3224
796d282 [Kay Ousterhout] Added unit test for SPARK-3224
3d3d356 [Reynold Xin] Remove map output loc even for repeated FetchFaileds.
1dd3eb5 [Reynold Xin] [SPARK-3224] FetchFailed reduce stages should only show 
up once in the failed stages UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf719056
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf719056
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf719056

Branch: refs/heads/master
Commit: bf719056b71d55e1194554661dfa194ed03d364d
Parents: e70aff6
Author: Reynold Xin <[email protected]>
Authored: Tue Aug 26 21:59:48 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Tue Aug 26 22:12:37 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 32 +++++++++------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 41 +++++++++++++++++++-
 2 files changed, 59 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf719056/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3413198..2ccc273 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1045,31 +1045,39 @@ class DAGScheduler(
         stage.pendingTasks += task
 
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
-        // Mark the stage that the reducer was in as unrunnable
         val failedStage = stageIdToStage(task.stageId)
-        markStageAsFinished(failedStage, Some("Fetch failure"))
-        runningStages -= failedStage
-        // TODO: Cancel running tasks in the stage
-        logInfo("Marking " + failedStage + " (" + failedStage.name +
-          ") for resubmision due to a fetch failure")
-        // Mark the map whose fetch failed as broken in the map stage
         val mapStage = shuffleToMapStage(shuffleId)
-        if (mapId != -1) {
-          mapStage.removeOutputLoc(mapId, bmAddress)
-          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+
+        // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
+        // multiple tasks running concurrently on different executors). In 
that case, it is possible
+        // the fetch failure has already been handled by the scheduler.
+        if (runningStages.contains(failedStage)) {
+          logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
+            s"due to a fetch failure from $mapStage (${mapStage.name})")
+          markStageAsFinished(failedStage, Some("Fetch failure"))
+          runningStages -= failedStage
         }
-        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name 
+
-          "); marking it for resubmission")
+
         if (failedStages.isEmpty && eventProcessActor != null) {
           // Don't schedule an event to resubmit failed stages if failed isn't 
empty, because
           // in that case the event will already have been scheduled. 
eventProcessActor may be
           // null during unit tests.
+          // TODO: Cancel running tasks in the stage
           import env.actorSystem.dispatcher
+          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+            s"$failedStage (${failedStage.name}) due to fetch failure")
           env.actorSystem.scheduler.scheduleOnce(
             RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
         }
         failedStages += failedStage
         failedStages += mapStage
+
+        // Mark the map whose fetch failed as broken in the map stage
+        if (mapId != -1) {
+          mapStage.removeOutputLoc(mapId, bmAddress)
+          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+        }
+
         // TODO: mark the executor as failed only if there were lots of fetch 
failures on it
         if (bmAddress != null) {
           handleExecutorLost(bmAddress.executorId, Some(task.epoch))

http://git-wip-us.apache.org/repos/asf/spark/blob/bf719056/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index bd82975..f5fed98 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable.{HashSet, HashMap, Map}
+import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 
 import akka.actor._
@@ -98,7 +98,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
   val WAIT_TIMEOUT_MILLIS = 10000
   val sparkListener = new SparkListener() {
     val successfulStages = new HashSet[Int]()
-    val failedStages = new HashSet[Int]()
+    val failedStages = new ArrayBuffer[Int]()
     override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) 
{
       val stageInfo = stageCompleted.stageInfo
       if (stageInfo.failureReason.isEmpty) {
@@ -435,6 +435,43 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("trivial shuffle with multiple fetch failures") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 1)),
+      (Success, makeMapStatus("hostB", 1))))
+    // The MapOutputTracker should know about both map output locations.
+    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
+      Array("hostA", "hostB"))
+
+    // The first result task fails, with a fetch failure for the output from 
the first mapper.
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
+      null,
+      Map[Long, Any](),
+      null,
+      null))
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.contains(0))
+
+    // The second ResultTask fails, with a fetch failure for the output from 
the second mapper.
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
+      null,
+      Map[Long, Any](),
+      null,
+      null))
+    // The SparkListener should not receive redundant failure events.
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.size == 1)
+  }
+
   test("ignore late map task completions") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to