Repository: spark
Updated Branches:
refs/heads/branch-2.0 31ea3c7bd -> 0d24fe09a
[SPARK-13902][SCHEDULER] Make DAGScheduler not to create duplicate stage.
## What changes were proposed in this pull request?
`DAGScheduler`sometimes generate incorrect stage graph.
Suppose you have the following DAG:
```
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
\ /
<-------------
```
Note: [] means an RDD, () means a shuffle dependency.
Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle
dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the
`DAGScheduler`, but to make the example easier to understand, let's call the
shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from
`B` shuffle dependency ID `s_B`.
The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly)
does not check for duplicates when it's adding ShuffleDependencies to the
parents data structure, so for this DAG, when `getAncestorShuffleDependencies`
gets called on `C` (previous of the final RDD),
`getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets
added twice: once when the method "visit"s RDD `C`, and once when the method
"visit"s RDD `B`). This is problematic because this line of code:
https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289
then generates a new shuffle stage for each dependency returned by
`getAncestorShuffleDependencies`, resulting in duplicate map stages that
compute the map output from RDD `A`.
As a result, `DAGScheduler` generates the following stages and their parents
for each shuffle:
| | stage | parents |
|----|----|----|
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| - | ResultStage 4 | List(ShuffleMapStage 3) |
The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is
generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten
by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the
old stage `ShuffleMapStage 0`.
This patch is fixing it.
## How was this patch tested?
I added the sample RDD graph to show the illegal stage graph to
`DAGSchedulerSuite`.
Author: Takuya UESHIN <[email protected]>
Closes #12655 from ueshin/issues/SPARK-13902.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d24fe09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d24fe09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d24fe09
Branch: refs/heads/branch-2.0
Commit: 0d24fe09aa5bafe85bf694dc4e2e2ebbfb3af250
Parents: 31ea3c7
Author: Takuya UESHIN <[email protected]>
Authored: Thu May 12 12:36:18 2016 -0700
Committer: Kay Ousterhout <[email protected]>
Committed: Thu May 12 14:05:48 2016 -0700
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 4 +-
.../spark/scheduler/DAGSchedulerSuite.scala | 47 ++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0d24fe09/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4dfd532..5291b66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -286,7 +286,9 @@ class DAGScheduler(
case None =>
// We are going to register ancestor shuffle dependencies
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
- shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep,
firstJobId)
+ if (!shuffleToMapStage.contains(dep.shuffleId)) {
+ shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep,
firstJobId)
+ }
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
http://git-wip-us.apache.org/repos/asf/spark/blob/0d24fe09/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e3ed079..088a476 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -325,6 +325,53 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with Timeou
assert(sparkListener.stageByOrderOfExecution(0) <
sparkListener.stageByOrderOfExecution(1))
}
+ /**
+ * This test ensures that DAGScheduler build stage graph correctly.
+ *
+ * Suppose you have the following DAG:
+ *
+ * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
+ * \ /
+ * <-------------
+ *
+ * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle
dependency on both
+ * B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but
to make the example
+ * easier to understand, let's call the shuffled data from A shuffle
dependency ID s_A and the
+ * shuffled data from B shuffle dependency ID s_B.
+ *
+ * Note: [] means an RDD, () means a shuffle dependency.
+ */
+ test("[SPARK-13902] Ensure no duplicate stages are created") {
+ val rddA = new MyRDD(sc, 1, Nil)
+ val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
+ val s_A = shuffleDepA.shuffleId
+
+ val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker)
+ val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
+ val s_B = shuffleDepB.shuffleId
+
+ val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker =
mapOutputTracker)
+ val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
+ val s_C = shuffleDepC.shuffleId
+
+ val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker)
+
+ submit(rddD, Array(0))
+
+ assert(scheduler.shuffleToMapStage.size === 3)
+ assert(scheduler.activeJobs.size === 1)
+
+ val mapStageA = scheduler.shuffleToMapStage(s_A)
+ val mapStageB = scheduler.shuffleToMapStage(s_B)
+ val mapStageC = scheduler.shuffleToMapStage(s_C)
+ val finalStage = scheduler.activeJobs.head.finalStage
+
+ assert(mapStageA.parents.isEmpty)
+ assert(mapStageB.parents === List(mapStageA))
+ assert(mapStageC.parents === List(mapStageA, mapStageB))
+ assert(finalStage.parents === List(mapStageC))
+ }
+
test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]