Repository: spark
Updated Branches:
  refs/heads/master 8b5b2e272 -> dccc0aa3c


[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave 
lost or fetch failure

## What changes were proposed in this pull request?

Currently, when we detect fetch failure, we only remove the shuffle files 
produced by the executor, while the host itself might be down and all the 
shuffle files are not accessible. In case we are running multiple executors on 
a host, any host going down currently results in multiple fetch failures and 
multiple retries of the stage, which is very inefficient. If we remove all the 
shuffle files on that host, on first fetch failure, we can rerun all the tasks 
on that host in a single stage retry.

## How was this patch tested?

Unit testing and also ran a job on the cluster and made sure multiple retries 
are gone.

Author: Sital Kedia <[email protected]>
Author: Imran Rashid <[email protected]>

Closes #18150 from sitalkedia/cleanup_shuffle.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dccc0aa3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dccc0aa3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dccc0aa3

Branch: refs/heads/master
Commit: dccc0aa3cf957c8eceac598ac81ac82f03b52105
Parents: 8b5b2e2
Author: Sital Kedia <[email protected]>
Authored: Wed Jun 14 11:34:09 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 14 11:34:09 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     | 33 +++++++++-
 .../apache/spark/internal/config/package.scala  |  8 +++
 .../apache/spark/scheduler/DAGScheduler.scala   | 67 ++++++++++++++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     | 67 ++++++++++++++++++++
 4 files changed, 160 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dccc0aa3/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 3e10b9e..5d48bc7 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -55,7 +55,8 @@ private class ShuffleStatus(numPartitions: Int) {
    * locations is so small that we choose to ignore that case and store only a 
single location
    * for each output.
    */
-  private[this] val mapStatuses = new Array[MapStatus](numPartitions)
+  // Exposed for testing
+  val mapStatuses = new Array[MapStatus](numPartitions)
 
   /**
    * The cached result of serializing the map statuses array. This cache is 
lazily populated when
@@ -106,13 +107,29 @@ private class ShuffleStatus(numPartitions: Int) {
   }
 
   /**
+   * Removes all shuffle outputs associated with this host. Note that this 
will also remove
+   * outputs which are served by an external shuffle server (if one exists).
+   */
+  def removeOutputsOnHost(host: String): Unit = {
+    removeOutputsByFilter(x => x.host == host)
+  }
+
+  /**
    * Removes all map outputs associated with the specified executor. Note that 
this will also
    * remove outputs which are served by an external shuffle server (if one 
exists), as they are
    * still registered with that execId.
    */
   def removeOutputsOnExecutor(execId: String): Unit = synchronized {
+    removeOutputsByFilter(x => x.executorId == execId)
+  }
+
+  /**
+   * Removes all shuffle outputs which satisfies the filter. Note that this 
will also
+   * remove outputs which are served by an external shuffle server (if one 
exists).
+   */
+  def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = 
synchronized {
     for (mapId <- 0 until mapStatuses.length) {
-      if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId 
== execId) {
+      if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) {
         _numAvailableOutputs -= 1
         mapStatuses(mapId) = null
         invalidateSerializedMapOutputStatusCache()
@@ -317,7 +334,8 @@ private[spark] class MapOutputTrackerMaster(
 
   // HashMap for storing shuffleStatuses in the driver.
   // Statuses are dropped only by explicit de-registering.
-  private val shuffleStatuses = new ConcurrentHashMap[Int, 
ShuffleStatus]().asScala
+  // Exposed for testing
+  val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
 
   private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
 
@@ -416,6 +434,15 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
+   * Removes all shuffle outputs associated with this host. Note that this 
will also remove
+   * outputs which are served by an external shuffle server (if one exists).
+   */
+  def removeOutputsOnHost(host: String): Unit = {
+    shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnHost(host) }
+    incrementEpoch()
+  }
+
+  /**
    * Removes all shuffle outputs associated with this executor. Note that this 
will also remove
    * outputs which are served by an external shuffle server (if one exists), 
as they are still
    * registered with this execId.

http://git-wip-us.apache.org/repos/asf/spark/blob/dccc0aa3/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7827e67..84ef57f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -151,6 +151,14 @@ package object config {
       .createOptional
   // End blacklist confs
 
+  private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
+    ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost")
+      .doc("Whether to un-register all the outputs on the host in condition 
that we receive " +
+        " a FetchFailure. This is set default to false, which means, we only 
un-register the " +
+        " outputs related to the exact executor(instead of the host) on a 
FetchFailure.")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
     ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
       .withAlternative("spark.scheduler.listenerbus.eventqueue.size")

http://git-wip-us.apache.org/repos/asf/spark/blob/dccc0aa3/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 932e6c1..fafe9ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
@@ -188,6 +189,14 @@ class DAGScheduler(
   private val disallowStageRetryForTest = 
sc.getConf.getBoolean("spark.test.noStageRetry", false)
 
   /**
+   * Whether to unregister all the outputs on the host in condition that we 
receive a FetchFailure,
+   * this is set default to false, which means, we only unregister the outputs 
related to the exact
+   * executor(instead of the host) on a FetchFailure.
+   */
+  private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
+    sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE)
+
+  /**
    * Number of consecutive stage attempts allowed before a stage is aborted.
    */
   private[scheduler] val maxConsecutiveStageAttempts =
@@ -1336,7 +1345,21 @@ class DAGScheduler(
 
           // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
           if (bmAddress != null) {
-            handleExecutorLost(bmAddress.executorId, filesLost = true, 
Some(task.epoch))
+            val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
+              unRegisterOutputOnHostOnFetchFailure) {
+              // We had a fetch failure with the external shuffle service, so 
we
+              // assume all shuffle data on the node is bad.
+              Some(bmAddress.host)
+            } else {
+              // Unregister shuffle data just for one executor (we don't have 
any
+              // reason to believe shuffle data has been lost for the entire 
host).
+              None
+            }
+            removeExecutorAndUnregisterOutputs(
+              execId = bmAddress.executorId,
+              fileLost = true,
+              hostToUnregisterOutputs = hostToUnregisterOutputs,
+              maybeEpoch = Some(task.epoch))
           }
         }
 
@@ -1370,22 +1393,42 @@ class DAGScheduler(
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
-      filesLost: Boolean,
-      maybeEpoch: Option[Long] = None) {
+      workerLost: Boolean): Unit = {
+    // if the cluster manager explicitly tells us that the entire worker was 
lost, then
+    // we know to unregister shuffle output.  (Note that "worker" specifically 
refers to the process
+    // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+    val fileLost = workerLost || 
!env.blockManager.externalShuffleServiceEnabled
+    removeExecutorAndUnregisterOutputs(
+      execId = execId,
+      fileLost = fileLost,
+      hostToUnregisterOutputs = None,
+      maybeEpoch = None)
+  }
+
+  private def removeExecutorAndUnregisterOutputs(
+      execId: String,
+      fileLost: Boolean,
+      hostToUnregisterOutputs: Option[String],
+      maybeEpoch: Option[Long] = None): Unit = {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
       logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
       blockManagerMaster.removeExecutor(execId)
-
-      if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
-        logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
-        mapOutputTracker.removeOutputsOnExecutor(execId)
+      if (fileLost) {
+        hostToUnregisterOutputs match {
+          case Some(host) =>
+            logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, 
currentEpoch))
+            mapOutputTracker.removeOutputsOnHost(host)
+          case None =>
+            logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
+            mapOutputTracker.removeOutputsOnExecutor(execId)
+        }
         clearCacheLocs()
+
+      } else {
+        logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
       }
-    } else {
-      logDebug("Additional executor lost message for " + execId +
-               "(epoch " + currentEpoch + ")")
     }
   }
 
@@ -1678,11 +1721,11 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
       dagScheduler.handleExecutorAdded(execId, host)
 
     case ExecutorLost(execId, reason) =>
-      val filesLost = reason match {
+      val workerLost = reason match {
         case SlaveLost(_, true) => true
         case _ => false
       }
-      dagScheduler.handleExecutorLost(execId, filesLost)
+      dagScheduler.handleExecutorLost(execId, workerLost)
 
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)

http://git-wip-us.apache.org/repos/asf/spark/blob/dccc0aa3/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 67145e7..ddd3281 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -396,6 +396,73 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     assertDataStructuresEmpty()
   }
 
+  test("All shuffle files on the slave should be cleaned up when slave lost") {
+    // reset the test context with the right shuffle service config
+    afterEach()
+    val conf = new SparkConf()
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
+    init(conf)
+    runEvent(ExecutorAdded("exec-hostA1", "hostA"))
+    runEvent(ExecutorAdded("exec-hostA2", "hostA"))
+    runEvent(ExecutorAdded("exec-hostB", "hostB"))
+    val firstRDD = new MyRDD(sc, 3, Nil)
+    val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
+    val firstShuffleId = firstShuffleDep.shuffleId
+    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+    val secondShuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+    submit(reduceRdd, Array(0))
+    // map stage1 completes successfully, with one task on each executor
+    complete(taskSets(0), Seq(
+      (Success,
+        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+      (Success,
+        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+      (Success, makeMapStatus("hostB", 1))
+    ))
+    // map stage2 completes successfully, with one task on each executor
+    complete(taskSets(1), Seq(
+      (Success,
+        MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+      (Success,
+        MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+      (Success, makeMapStatus("hostB", 1))
+    ))
+    // make sure our test setup is correct
+    val initialMapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
+    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
+    assert(initialMapStatus1.count(_ != null) === 3)
+    assert(initialMapStatus1.map{_.location.executorId}.toSet ===
+      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
+
+    val initialMapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
+    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
+    assert(initialMapStatus2.count(_ != null) === 3)
+    assert(initialMapStatus2.map{_.location.executorId}.toSet ===
+      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
+
+    // reduce stage fails with a fetch failure from one host
+    complete(taskSets(2), Seq(
+      (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), 
firstShuffleId, 0, 0, "ignored"),
+        null)
+    ))
+
+    // Here is the main assertion -- make sure that we de-register
+    // the map outputs for both map stage from both executors on hostA
+
+    val mapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
+    assert(mapStatus1.count(_ != null) === 1)
+    assert(mapStatus1(2).location.executorId === "exec-hostB")
+    assert(mapStatus1(2).location.host === "hostB")
+
+    val mapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
+    assert(mapStatus2.count(_ != null) === 1)
+    assert(mapStatus2(2).location.executorId === "exec-hostB")
+    assert(mapStatus2(2).location.host === "hostB")
+  }
+
   test("zero split job") {
     var numResults = 0
     var failureReason: Option[Exception] = None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to