Repository: spark
Updated Branches:
  refs/heads/branch-1.0 15fd9f2bb -> b4b0a54cf


[SPARK-2204] Launch tasks on the proper executors in mesos fine-grained mode

The scheduler for Mesos in fine-grained mode launches tasks on the wrong 
executors. `MesosSchedulerBackend.resourceOffers(SchedulerDriver, List[Offer])` 
is assuming that `TaskSchedulerImpl.resourceOffers(Seq[WorkerOffer])` is 
returning task lists in the same order as the offers it was passed, but in the 
current implementation `TaskSchedulerImpl.resourceOffers` shuffles the offers 
to avoid assigning the tasks always to the same executors. The result is that 
the tasks are launched on the wrong executors. The jobs are sometimes able to 
complete, but most of the time they fail. It seems that as soon as something 
goes wrong with a task for some reason Spark is not able to recover since it's 
mistaken as to where the tasks are actually running. Also, it seems that the 
more the cluster is under load the more likely the job is to fail because 
there's a higher probability that Spark is trying to launch a task on a slave 
that doesn't actually have enough resources, again because it's using the 
 wrong offers.

The solution is to not assume that the order in which the tasks are returned is 
the same as the offers, and simply launch the tasks on the executor decided by 
`TaskSchedulerImpl.resourceOffers`. What I am not sure about is that I 
considered slaveId and executorId to be the same, which is true at least in my 
setup, but I don't know if that is always true.

I tested this on top of the 1.0.0 release and it seems to work fine on our 
cluster.

Author: Sebastien Rainville <[email protected]>

Closes #1140 from sebastienrainville/fine-grained-mode-fix-master and squashes 
the following commits:

a98b0e0 [Sebastien Rainville] Use a HashMap to retrieve the offer indices
d6ffe54 [Sebastien Rainville] Launch tasks on the proper executors in mesos 
fine-grained mode
(cherry picked from commit 1132e472eca1a00c2ce10d2f84e8f0e79a5193d3)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4b0a54c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4b0a54c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4b0a54c

Branch: refs/heads/branch-1.0
Commit: b4b0a54cf285614b7926c3c32b8ec652cf6ccef3
Parents: 15fd9f2
Author: Sebastien Rainville <[email protected]>
Authored: Wed Jun 25 13:21:18 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed Jun 25 13:21:45 2014 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosSchedulerBackend.scala          | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4b0a54c/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index f08b19e..100dfaf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -185,8 +185,8 @@ private[spark] class MesosSchedulerBackend(
       synchronized {
         // Build a big list of the offerable workers, and remember their 
indices so that we can
         // figure out which Offer to reply to for each worker
-        val offerableIndices = new ArrayBuffer[Int]
         val offerableWorkers = new ArrayBuffer[WorkerOffer]
+        val offerableIndices = new HashMap[String, Int]
 
         def enoughMemory(o: Offer) = {
           val mem = getResource(o.getResourcesList, "mem")
@@ -195,7 +195,7 @@ private[spark] class MesosSchedulerBackend(
         }
 
         for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-          offerableIndices += index
+          offerableIndices.put(offer.getSlaveId.getValue, index)
           offerableWorkers += new WorkerOffer(
             offer.getSlaveId.getValue,
             offer.getHostname,
@@ -206,14 +206,13 @@ private[spark] class MesosSchedulerBackend(
         val taskLists = scheduler.resourceOffers(offerableWorkers)
 
         // Build a list of Mesos tasks for each slave
-        val mesosTasks = offers.map(o => 
Collections.emptyList[MesosTaskInfo]())
+        val mesosTasks = offers.map(o => new JArrayList[MesosTaskInfo]())
         for ((taskList, index) <- taskLists.zipWithIndex) {
           if (!taskList.isEmpty) {
-            val offerNum = offerableIndices(index)
-            val slaveId = offers(offerNum).getSlaveId.getValue
-            slaveIdsWithExecutors += slaveId
-            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
             for (taskDesc <- taskList) {
+              val slaveId = taskDesc.executorId
+              val offerNum = offerableIndices(slaveId)
+              slaveIdsWithExecutors += slaveId
               taskIdToSlaveId(taskDesc.taskId) = slaveId
               mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
             }

Reply via email to