Repository: spark
Updated Branches:
  refs/heads/master 4b4e329e4 -> fb2008431


[SPARK-17304] Fix perf. issue caused by 
TaskSetManager.abortIfCompletelyBlacklisted

This patch addresses a minor scheduler performance issue that was introduced in 
#13603. If you run

```
sc.parallelize(1 to 100000, 100000).map(identity).count()
```

then most of the time ends up being spent in 
`TaskSetManager.abortIfCompletelyBlacklisted()`:

![image](https://cloud.githubusercontent.com/assets/50748/18071032/428732b0-6e07-11e6-88b2-c9423cd61f53.png)

When processing resource offers, the scheduler uses a nested loop which 
considers every task set at multiple locality levels:

```scala
   for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
```

In order to prevent jobs with globally blacklisted tasks from hanging, #13603 
added a `taskSet.abortIfCompletelyBlacklisted` call inside of  
`resourceOfferSingleTaskSet`; if a call to `resourceOfferSingleTaskSet` fails 
to schedule any tasks, then `abortIfCompletelyBlacklisted` checks whether the 
tasks are completely blacklisted in order to figure out whether they will ever 
be schedulable. The problem with this placement of the call is that the last 
call to `resourceOfferSingleTaskSet` in the `while` loop will return `false`, 
implying that  `resourceOfferSingleTaskSet` will call 
`abortIfCompletelyBlacklisted`, so almost every call to `resourceOffers` will 
trigger the `abortIfCompletelyBlacklisted` check for every task set.

Instead, I think that this call should be moved out of the innermost loop and 
should be called _at most_ once per task set in case none of the task set's 
tasks can be scheduled at any locality level.

Before this patch's changes, the microbenchmark example that I posted above 
took 35 seconds to run, but it now only takes 15 seconds after this change.

/cc squito and kayousterhout for review.

Author: Josh Rosen <[email protected]>

Closes #14871 from JoshRosen/bail-early-if-no-cpus.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb200843
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb200843
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb200843

Branch: refs/heads/master
Commit: fb20084313470593d8507a43fcb2cde2a4c854d9
Parents: 4b4e329
Author: Josh Rosen <[email protected]>
Authored: Tue Aug 30 13:15:21 2016 -0700
Committer: Josh Rosen <[email protected]>
Committed: Tue Aug 30 13:15:21 2016 -0700

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     | 22 ++++++++++++--------
 1 file changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb200843/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index dc05e76..7d90553 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -278,9 +278,6 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
-    if (!launchedTask) {
-      taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
-    }
     return launchedTask
   }
 
@@ -326,12 +323,19 @@ private[spark] class TaskSchedulerImpl(
     // Take each TaskSet in our scheduling order, and then offer it each node 
in increasing order
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
-    var launchedTask = false
-    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
-      do {
-        launchedTask = resourceOfferSingleTaskSet(
-            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
-      } while (launchedTask)
+    for (taskSet <- sortedTaskSets) {
+      var launchedAnyTask = false
+      var launchedTaskAtCurrentMaxLocality = false
+      for (currentMaxLocality <- taskSet.myLocalityLevels) {
+        do {
+          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
+            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
+          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+        } while (launchedTaskAtCurrentMaxLocality)
+      }
+      if (!launchedAnyTask) {
+        taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+      }
     }
 
     if (tasks.size > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to