This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7a4cf33 [SPARK-30388][CORE] Mark running map stages of finished job
as finished, and cancel running tasks
7a4cf33 is described below
commit 7a4cf339d7082b576624940253e8283de9e83e19
Author: xuesenliang <[email protected]>
AuthorDate: Tue Mar 3 09:29:43 2020 -0600
[SPARK-30388][CORE] Mark running map stages of finished job as finished,
and cancel running tasks
### What changes were proposed in this pull request?
When a job finished, its running (re-submitted) map stages should be marked
as finished if not used by other jobs. The running tasks of these stages are
cancelled.
And the ListenerBus should be notified too, otherwise, these map stage
items will stay on the "Active Stages" page of web UI and never gone.
For example:
Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0
has two partitions, and its result stage 1 has two partitions too.
**Steps to reproduce the bug:**
1. map stage 0: start task 0(```TID 0```) and task 1 (```TID 1```), then
both finished successfully.
2. result stage 1: start task 0(```TID 2```) and task 1 (```TID 3```)
3. result stage 1: task 0(```TID 2```) finished successfully
4. result stage 1: speculative task 1.1(```TID 4```) launched, but then
failed due to FetchFailedException.
5. driver re-submits map stage 0 and result stage 1.
6. map stage 0 (retry 1): task0(```TID 5```) launched
7. result stage 1: task 1(```TID 3```) finished successfully, so job 0
finished.
8. map stage 0 is removed from ```runningStages``` and
```stageIdToStage```, because it doesn't belong to any job.
```
private def DAGScheduler#cleanupStateForJobAndIndependentStages(job:
ActiveJob): HashSet[Stage] = {
...
stageIdToStage.filterKeys(stageId =>
registeredStages.get.contains(stageId)).foreach {
case (stageId, stage) =>
...
def removeStage(stageId: Int): Unit = {
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
...
}
stageIdToStage -= stageId
}
jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
removeStage(stageId)
}
}
...
}
```
9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage
0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
```
private[scheduler] def DAGScheduler#handleTaskCompletion(event:
CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
...
if (!stageIdToStage.contains(task.stageId)) {
postTaskEnd(event)
// Skip all the actions if the stage has been cancelled.
return
}
...
```
#### Relevant spark driver logs as follows:
```
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at
NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at
NativeMethodAccessorImpl.java:0)
20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 0)
20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
0)
20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no
missing parents
20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0,
1))
20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2
tasks
20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID
0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at
NativeMethodAccessorImpl.java:0) finished in 40.854 s
20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose
tasks have all completed, from pool
20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1
(MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no
missing parents
20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0)
(first 15 tasks are for partitions Vector(0, 1))
20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2
tasks
20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID
2) in 79 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on
9.76.13.26) as speculatable because it ran more than 158 ms
20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID
4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4,
9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337,
None), shuffleId=0, mapId=0, reduceId=1,
message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4)
failed, but the task will not be re-executed (either because the task failed
with a shuffle data fetch failure, so the previous stage needs to be re-run, or
because a different copy of the task has already succeeded).
20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at
NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from
ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at
NativeMethodAccessorImpl.java:0) failed in 0.261 s due to
org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main
at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at
NativeMethodAccessorImpl.java:0) due to fetch failure
20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages
20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no
missing parents
20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1
tasks
20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID
5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
// NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage
1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
// and this bug is being fixed in
https://issues.apache.org/jira/browse/SPARK-30404
20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0
in stage 1.0 because task 1 has already completed successfully
20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose
tasks have all completed, from pool
20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at
NativeMethodAccessorImpl.java:0) finished in 10.131 s
20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at
NativeMethodAccessorImpl.java:0, took 51.031212 s
20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID
5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose
tasks have all completed, from pool
```
### Why are the changes needed?
web UI is incorrect: ```stage 0 (retry 1)``` is finished, but it stays in
```Active Stages``` Page.

### Does this PR introduce any user-facing change?
No
### How was this patch tested?
A new test case is added.
And test manually on cluster. The result is as follows:

Closes #27050 from liangxs/master.
Authored-by: xuesenliang <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../org/apache/spark/scheduler/DAGScheduler.scala | 27 +++++++------
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 44 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fd5c3e0..a226b65 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1434,6 +1434,7 @@ private[spark] class DAGScheduler(
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
+ cancelRunningIndependentStages(job, s"Job ${job.jobId} is
finished.")
cleanupStateForJobAndIndependentStages(job)
try {
// killAllTaskAttempts will fail if a SchedulerBackend
does not implement
@@ -1978,18 +1979,12 @@ private[spark] class DAGScheduler(
}
}
- /** Fails a job and all stages that are only used by that job, and cleans up
relevant state. */
- private def failJobAndIndependentStages(
- job: ActiveJob,
- failureReason: String,
- exception: Option[Throwable] = None): Unit = {
- val error = new SparkException(failureReason, exception.orNull)
+ /** Cancel all independent, running stages that are only used by this job. */
+ private def cancelRunningIndependentStages(job: ActiveJob, reason: String):
Boolean = {
var ableToCancelStages = true
-
- // Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
- logError("No stages registered for job " + job.jobId)
+ logError(s"No stages registered for job ${job.jobId}")
}
stages.foreach { stageId =>
val jobsForStage: Option[HashSet[Int]] =
stageIdToStage.get(stageId).map(_.jobIds)
@@ -2001,12 +1996,12 @@ private[spark] class DAGScheduler(
if (!stageIdToStage.contains(stageId)) {
logError(s"Missing Stage for stage with id $stageId")
} else {
- // This is the only job that uses this stage, so fail the stage if
it is running.
+ // This stage is only used by the job, so finish the stage if it is
running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not
implement killTask
taskScheduler.cancelTasks(stageId,
shouldInterruptTaskThread(job))
- markStageAsFinished(stage, Some(failureReason))
+ markStageAsFinished(stage, Some(reason))
} catch {
case e: UnsupportedOperationException =>
logWarning(s"Could not cancel tasks for stage $stageId", e)
@@ -2016,11 +2011,19 @@ private[spark] class DAGScheduler(
}
}
}
+ ableToCancelStages
+ }
- if (ableToCancelStages) {
+ /** Fails a job and all stages that are only used by that job, and cleans up
relevant state. */
+ private def failJobAndIndependentStages(
+ job: ActiveJob,
+ failureReason: String,
+ exception: Option[Throwable] = None): Unit = {
+ if (cancelRunningIndependentStages(job, failureReason)) {
// SPARK-15783 important to cleanup state first, just for tests where we
have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in
the tests.
cleanupStateForJobAndIndependentStages(job)
+ val error = new SparkException(failureReason, exception.orNull)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(),
JobFailed(error)))
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e40b63f..2b2fd32 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1933,6 +1933,50 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}
+ test("shuffle fetch failed on speculative task, but original task succeed
(SPARK-30388)") {
+ var completedStage: List[Int] = Nil
+ val listener = new SparkListener() {
+ override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ completedStage = completedStage :+ event.stageInfo.stageId
+ }
+ }
+ sc.addSparkListener(listener)
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(2))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduceRdd, Array(0, 1))
+ completeShuffleMapStageSuccessfully(0, 0, 2)
+ assert(completedStage === List(0))
+
+ // result task 0.0 succeed
+ runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42))
+ // speculative result task 1.1 fetch failed
+ val info = new TaskInfo(4, index = 1, attemptNumber = 1, 0L, "", "",
TaskLocality.ANY, true)
+ runEvent(makeCompletionEvent(
+ taskSets(1).tasks(1),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0,
1, "ignored"),
+ null,
+ Seq.empty,
+ Array.empty,
+ info
+ )
+ )
+ assert(completedStage === List(0, 1))
+
+ Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+ // map stage resubmitted
+ assert(scheduler.runningStages.size === 1)
+ val mapStage = scheduler.runningStages.head
+ assert(mapStage.id === 0)
+ assert(mapStage.latestInfo.failureReason.isEmpty)
+
+ // original result task 1.0 succeed
+ runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
+ assert(completedStage === List(0, 1, 1, 0))
+ assert(scheduler.activeJobs.isEmpty)
+ }
+
test("misbehaved accumulator should not crash DAGScheduler and
SparkContext") {
val acc = new LongAccumulator {
override def add(v: java.lang.Long): Unit = throw new
DAGSchedulerSuiteDummyException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]