Repository: spark
Updated Branches:
  refs/heads/branch-1.0 8202276c9 -> adf8cdd0b


SPARK-1686: keep schedule() calling in the main thread

https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be 
called when a standalone Master is recovering from failure. It is responsible 
for resetting some state, relaunching drivers, and eventually resuming its 
scheduling duties.

There are currently four places in Master.scala where completeRecovery is 
called. Three of them are from within the actor's receive method, and aren't 
problems. The last starts from within receive when the ElectedLeader message is 
received, but the actual completeRecovery() call is made from the Akka 
scheduler. That means that it will execute on a different scheduler thread, and 
Master itself will end up running (i.e., schedule() ) from that Akka scheduler 
thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" 
call of schedule() in the scheduler thread

Author: CodingCat <[email protected]>

Closes #639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread

(cherry picked from commit 2f452cbaf35dbc609ab48ec0ee5e3dd7b6b9b790)
Signed-off-by: Aaron Davidson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adf8cdd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adf8cdd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adf8cdd0

Branch: refs/heads/branch-1.0
Commit: adf8cdd0b29731325f08552d050c43fe1bbd724f
Parents: 8202276
Author: CodingCat <[email protected]>
Authored: Fri May 9 21:50:23 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Fri May 9 21:52:40 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala      | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/adf8cdd0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index fdb633b..f254f55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -104,6 +104,8 @@ private[spark] class Master(
 
   var leaderElectionAgent: ActorRef = _
 
+  private var recoveryCompletionTask: Cancellable = _
+
   // As a temporary workaround before better ways of configuring memory, we 
allow users to set
   // a flag that will perform round-robin scheduling across the nodes 
(spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a 
small # of nodes.
@@ -152,6 +154,10 @@ private[spark] class Master(
   }
 
   override def postStop() {
+    // prevent the CompleteRecovery message sending to restarted master
+    if (recoveryCompletionTask != null) {
+      recoveryCompletionTask.cancel()
+    }
     webUi.stop()
     fileSystemsUsed.foreach(_.close())
     masterMetricsSystem.stop()
@@ -171,10 +177,13 @@ private[spark] class Master(
       logInfo("I have been elected leader! New state: " + state)
       if (state == RecoveryState.RECOVERING) {
         beginRecovery(storedApps, storedDrivers, storedWorkers)
-        context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { 
completeRecovery() }
+        recoveryCompletionTask = 
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
+          CompleteRecovery)
       }
     }
 
+    case CompleteRecovery => completeRecovery()
+
     case RevokedLeadership => {
       logError("Leadership has been revoked -- master shutting down.")
       System.exit(0)
@@ -465,7 +474,7 @@ private[spark] class Master(
    * Schedule the currently available resources among waiting apps. This 
method will be called
    * every time a new app joins or resource availability changes.
    */
-  def schedule() {
+  private def schedule() {
     if (state != RecoveryState.ALIVE) { return }
 
     // First schedule drivers, they take strict precedence over applications
@@ -485,7 +494,7 @@ private[spark] class Master(
       // Try to spread out each app among all the nodes, until it has all its 
cores
       for (app <- waitingApps if app.coresLeft > 0) {
         val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
-                                   .filter(canUse(app, 
_)).sortBy(_.coresFree).reverse
+          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
         val numUsable = usableWorkers.length
         val assigned = new Array[Int](numUsable) // Number of cores to give on 
each node
         var toAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)

Reply via email to