Repository: spark Updated Branches: refs/heads/branch-1.3 a64e097f1 -> f26e38234
[SPARK-7624] Revert "[SPARK-4939] revive offers periodically in LocalBackend" in 1.3 branch This reverts commit e196da840978b61b0222a5fc9b59b5511cf04606. Author: Davies Liu <[email protected]> Closes #6337 from davies/revert_revive and squashes the following commits: be73f96 [Davies Liu] Revert "[SPARK-4939] revive offers periodically in LocalBackend" Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f26e3823 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f26e3823 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f26e3823 Branch: refs/heads/branch-1.3 Commit: f26e382345f690a17c37065eb055a3fde1fe76eb Parents: a64e097 Author: Davies Liu <[email protected]> Authored: Fri May 22 16:00:01 2015 -0700 Committer: Davies Liu <[email protected]> Committed: Fri May 22 16:00:01 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/local/LocalBackend.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f26e3823/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4676b82..05b6fa5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -19,8 +19,6 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer -import scala.concurrent.duration._ - import akka.actor.{Actor, ActorRef, Props} import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} @@ -48,8 +46,6 @@ private[spark] class LocalActor( private val totalCores: Int) extends Actor with ActorLogReceive with Logging { - import context.dispatcher // to use Akka's scheduler.scheduleOnce() - private var freeCores = totalCores private val localExecutorId = SparkContext.DRIVER_IDENTIFIER @@ -78,16 +74,11 @@ private[spark] class LocalActor( def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - val tasks = scheduler.resourceOffers(offers).flatten - for (task <- tasks) { + for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } - if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { - // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout - context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) - } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
