Repository: spark Updated Branches: refs/heads/master 208fbca10 -> cf4213864
[SPARK-10003] Improve readability of DAGScheduler Note: this is not intended to be in Spark 1.5! This patch rewrites some code in the `DAGScheduler` to make it more readable. In particular - there were blocks of code that are unnecessary and removed for simplicity - there were abstractions that are unnecessary and made the code hard to navigate - other minor changes Author: Andrew Or <[email protected]> Closes #8217 from andrewor14/dag-scheduler-readability and squashes the following commits: 57abca3 [Andrew Or] Move comment back into if case 574fb1e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-scheduler-readability 64a9ed2 [Andrew Or] Remove unnecessary code + minor code rewrites Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf421386 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf421386 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf421386 Branch: refs/heads/master Commit: cf42138643d1d4bf464f1d700457309d9e537721 Parents: 208fbca Author: Andrew Or <[email protected]> Authored: Thu Sep 3 17:55:10 2015 -0700 Committer: Kay Ousterhout <[email protected]> Committed: Thu Sep 3 17:55:10 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 46 ++++---------------- 1 file changed, 9 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cf421386/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d673cb0..09e963f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -250,11 +250,12 @@ class DAGScheduler( case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies - registerShuffleDependencies(shuffleDep, firstJobId) + getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage - stage } } @@ -365,16 +366,6 @@ class DAGScheduler( parents.toList } - /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */ - private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) { - val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) - while (parentsWithNoMapStage.nonEmpty) { - val currentShufDep = parentsWithNoMapStage.pop() - val stage = newOrUsedShuffleStage(currentShufDep, firstJobId) - shuffleToMapStage(currentShufDep.shuffleId) = stage - } - } - /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] @@ -391,11 +382,9 @@ class DAGScheduler( if (!shuffleToMapStage.contains(shufDep.shuffleId)) { parents.push(shufDep) } - - waitingForVisit.push(shufDep.rdd) case _ => - waitingForVisit.push(dep.rdd) } + waitingForVisit.push(dep.rdd) } } } @@ -1052,10 +1041,11 @@ class DAGScheduler( // we registered these map outputs. mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, - shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head), + shuffleStage.outputLocs.map(_.headOption.orNull), changeEpoch = true) clearCacheLocs() + if (shuffleStage.outputLocs.contains(Nil)) { // Some tasks had failed; let's resubmit this shuffleStage // TODO: Lower-level scheduler should also deal with this @@ -1064,27 +1054,9 @@ class DAGScheduler( shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty) .map(_._2).mkString(", ")) submitStage(shuffleStage) - } else { - val newlyRunnable = new ArrayBuffer[Stage] - for (shuffleStage <- waitingStages) { - logInfo("Missing parents for " + shuffleStage + ": " + - getMissingParentStages(shuffleStage)) - } - for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty) - { - newlyRunnable += shuffleStage - } - waitingStages --= newlyRunnable - runningStages ++= newlyRunnable - for { - shuffleStage <- newlyRunnable.sortBy(_.id) - jobId <- activeJobForStage(shuffleStage) - } { - logInfo("Submitting " + shuffleStage + " (" + - shuffleStage.rdd + "), which is now runnable") - submitMissingTasks(shuffleStage, jobId) - } } + + // Note: newly runnable stages will be submitted below when we submit waiting stages } } @@ -1186,7 +1158,7 @@ class DAGScheduler( // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) - val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head) + val locs = stage.outputLocs.map(_.headOption.orNull) mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) } if (shuffleToMapStage.isEmpty) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
