Repository: spark
Updated Branches:
  refs/heads/master 208fbca10 -> cf4213864


[SPARK-10003] Improve readability of DAGScheduler

Note: this is not intended to be in Spark 1.5!

This patch rewrites some code in the `DAGScheduler` to make it more readable. 
In particular
- there were blocks of code that are unnecessary and removed for simplicity
- there were abstractions that are unnecessary and made the code hard to 
navigate
- other minor changes

Author: Andrew Or <[email protected]>

Closes #8217 from andrewor14/dag-scheduler-readability and squashes the 
following commits:

57abca3 [Andrew Or] Move comment back into if case
574fb1e [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
dag-scheduler-readability
64a9ed2 [Andrew Or] Remove unnecessary code + minor code rewrites


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf421386
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf421386
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf421386

Branch: refs/heads/master
Commit: cf42138643d1d4bf464f1d700457309d9e537721
Parents: 208fbca
Author: Andrew Or <[email protected]>
Authored: Thu Sep 3 17:55:10 2015 -0700
Committer: Kay Ousterhout <[email protected]>
Committed: Thu Sep 3 17:55:10 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 46 ++++----------------
 1 file changed, 9 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf421386/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d673cb0..09e963f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -250,11 +250,12 @@ class DAGScheduler(
       case Some(stage) => stage
       case None =>
         // We are going to register ancestor shuffle dependencies
-        registerShuffleDependencies(shuffleDep, firstJobId)
+        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
+          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, 
firstJobId)
+        }
         // Then register current shuffleDep
         val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
         shuffleToMapStage(shuffleDep.shuffleId) = stage
-
         stage
     }
   }
@@ -365,16 +366,6 @@ class DAGScheduler(
     parents.toList
   }
 
-  /** Find ancestor missing shuffle dependencies and register into 
shuffleToMapStage */
-  private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, 
_], firstJobId: Int) {
-    val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
-    while (parentsWithNoMapStage.nonEmpty) {
-      val currentShufDep = parentsWithNoMapStage.pop()
-      val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
-      shuffleToMapStage(currentShufDep.shuffleId) = stage
-    }
-  }
-
   /** Find ancestor shuffle dependencies that are not registered in 
shuffleToMapStage yet */
   private def getAncestorShuffleDependencies(rdd: RDD[_]): 
Stack[ShuffleDependency[_, _, _]] = {
     val parents = new Stack[ShuffleDependency[_, _, _]]
@@ -391,11 +382,9 @@ class DAGScheduler(
               if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
                 parents.push(shufDep)
               }
-
-              waitingForVisit.push(shufDep.rdd)
             case _ =>
-              waitingForVisit.push(dep.rdd)
           }
+          waitingForVisit.push(dep.rdd)
         }
       }
     }
@@ -1052,10 +1041,11 @@ class DAGScheduler(
               //       we registered these map outputs.
               mapOutputTracker.registerMapOutputs(
                 shuffleStage.shuffleDep.shuffleId,
-                shuffleStage.outputLocs.map(list => if (list.isEmpty) null 
else list.head),
+                shuffleStage.outputLocs.map(_.headOption.orNull),
                 changeEpoch = true)
 
               clearCacheLocs()
+
               if (shuffleStage.outputLocs.contains(Nil)) {
                 // Some tasks had failed; let's resubmit this shuffleStage
                 // TODO: Lower-level scheduler should also deal with this
@@ -1064,27 +1054,9 @@ class DAGScheduler(
                   shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
                       .map(_._2).mkString(", "))
                 submitStage(shuffleStage)
-              } else {
-                val newlyRunnable = new ArrayBuffer[Stage]
-                for (shuffleStage <- waitingStages) {
-                  logInfo("Missing parents for " + shuffleStage + ": " +
-                    getMissingParentStages(shuffleStage))
-                }
-                for (shuffleStage <- waitingStages if 
getMissingParentStages(shuffleStage).isEmpty)
-                {
-                  newlyRunnable += shuffleStage
-                }
-                waitingStages --= newlyRunnable
-                runningStages ++= newlyRunnable
-                for {
-                  shuffleStage <- newlyRunnable.sortBy(_.id)
-                  jobId <- activeJobForStage(shuffleStage)
-                } {
-                  logInfo("Submitting " + shuffleStage + " (" +
-                    shuffleStage.rdd + "), which is now runnable")
-                  submitMissingTasks(shuffleStage, jobId)
-                }
               }
+
+              // Note: newly runnable stages will be submitted below when we 
submit waiting stages
             }
           }
 
@@ -1186,7 +1158,7 @@ class DAGScheduler(
         // TODO: This will be really slow if we keep accumulating shuffle map 
stages
         for ((shuffleId, stage) <- shuffleToMapStage) {
           stage.removeOutputsOnExecutor(execId)
-          val locs = stage.outputLocs.map(list => if (list.isEmpty) null else 
list.head)
+          val locs = stage.outputLocs.map(_.headOption.orNull)
           mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = 
true)
         }
         if (shuffleToMapStage.isEmpty) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to