Repository: spark
Updated Branches:
  refs/heads/master 25921110f -> f776bc988


[CORE] SPARK-2640: In "local[N]", free cores of the only executor should be 
touched by "spark.task.cpus" for every finish/start-up of tasks.

Make spark's "local[N]" better.
In our company, we use "local[N]" in production. It works exellentlly. It's our 
best choice.

Author: woshilaiceshide <[email protected]>

Closes #1544 from woshilaiceshide/localX and squashes the following commits:

6c85154 [woshilaiceshide] [CORE] SPARK-2640: In "local[N]", free cores of the 
only executor should be touched by "spark.task.cpus" for every finish/start-up 
of tasks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f776bc98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f776bc98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f776bc98

Branch: refs/heads/master
Commit: f776bc98878428940b5130c0d7d9b7ee452c0bd3
Parents: 2592111
Author: woshilaiceshide <[email protected]>
Authored: Wed Jul 23 11:05:41 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Wed Jul 23 11:05:41 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/local/LocalBackend.scala    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f776bc98/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e9f6273..5b89759 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -57,7 +57,7 @@ private[spark] class LocalActor(
     case StatusUpdate(taskId, state, serializedData) =>
       scheduler.statusUpdate(taskId, state, serializedData)
       if (TaskState.isFinished(state)) {
-        freeCores += 1
+        freeCores += scheduler.CPUS_PER_TASK
         reviveOffers()
       }
 
@@ -68,7 +68,7 @@ private[spark] class LocalActor(
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
     for (task <- scheduler.resourceOffers(offers).flatten) {
-      freeCores -= 1
+      freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, task.taskId, task.name, 
task.serializedTask)
     }
   }

Reply via email to