This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 38263f6 [SPARK-27630][CORE] Properly handle task end events from
completed stages
38263f6 is described below
commit 38263f6d153944b6f2f0248d9284861fc82532d6
Author: sychen <[email protected]>
AuthorDate: Tue Jun 25 14:30:13 2019 -0500
[SPARK-27630][CORE] Properly handle task end events from completed stages
## What changes were proposed in this pull request?
Track tasks separately for each stage attempt (instead of tracking by
stage), and do NOT reset the numRunningTasks to 0 on StageCompleted.
In the case of stage retry, the `taskEnd` event from the zombie stage
sometimes makes the number of `totalRunningTasks` negative, which will causes
the job to get stuck.
Similar problem also exists with `stageIdToTaskIndices` &
`stageIdToSpeculativeTaskIndices`.
If it is a failed `taskEnd` event of the zombie stage, this will cause
`stageIdToTaskIndices` or `stageIdToSpeculativeTaskIndices` to remove the task
index of the active stage, and the number of `totalPendingTasks` will increase
unexpectedly.
## How was this patch tested?
unit test properly handle task end events from completed stages
Closes #24497 from cxzl25/fix_stuck_job_follow_up.
Authored-by: sychen <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
---
.../apache/spark/ExecutorAllocationManager.scala | 113 ++++++++++++---------
.../org/apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../org/apache/spark/scheduler/SparkListener.scala | 5 +-
.../spark/ExecutorAllocationManagerSuite.scala | 33 +++++-
project/MimaExcludes.scala | 6 ++
5 files changed, 104 insertions(+), 55 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bb95fea..bceb26c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -491,6 +491,10 @@ private[spark] class ExecutorAllocationManager(
numExecutorsToAdd = 1
}
+ private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
+ override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
+ }
+
/**
* A listener that notifies the given allocation manager of when to add and
remove executors.
*
@@ -499,29 +503,32 @@ private[spark] class ExecutorAllocationManager(
*/
private[spark] class ExecutorAllocationListener extends SparkListener {
- private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
- // Number of running tasks per stage including speculative tasks.
+ private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int]
+ // Number of running tasks per stageAttempt including speculative tasks.
// Should be 0 when no stages are active.
- private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
- private val stageIdToTaskIndices = new mutable.HashMap[Int,
mutable.HashSet[Int]]
- // Number of speculative tasks to be scheduled in each stage
- private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
- // The speculative tasks started in each stage
- private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int,
mutable.HashSet[Int]]
-
- // stageId to tuple (the number of task with locality preferences, a map
where each pair is a
- // node and the number of tasks that would like to be scheduled on that
node) map,
- // maintain the executor placement hints for each stage Id used by
resource framework to better
- // place the executors.
- private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int,
(Int, Map[String, Int])]
+ private val stageAttemptToNumRunningTask = new
mutable.HashMap[StageAttempt, Int]
+ private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt,
mutable.HashSet[Int]]
+ // Number of speculative tasks to be scheduled in each stageAttempt
+ private val stageAttemptToNumSpeculativeTasks = new
mutable.HashMap[StageAttempt, Int]
+ // The speculative tasks started in each stageAttempt
+ private val stageAttemptToSpeculativeTaskIndices =
+ new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
+
+ // stageAttempt to tuple (the number of task with locality preferences, a
map where each pair
+ // is a node and the number of tasks that would like to be scheduled on
that node) map,
+ // maintain the executor placement hints for each stageAttempt used by
resource framework
+ // to better place the executors.
+ private val stageAttemptToExecutorPlacementHints =
+ new mutable.HashMap[StageAttempt, (Int, Map[String, Int])]
override def onStageSubmitted(stageSubmitted:
SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
+ val stageAttemptId = stageSubmitted.stageInfo.attemptNumber()
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
- stageIdToNumTasks(stageId) = numTasks
- stageIdToNumRunningTask(stageId) = 0
+ stageAttemptToNumTasks(stageAttempt) = numTasks
allocationManager.onSchedulerBacklogged()
// Compute the number of tasks requested by the stage on each host
@@ -536,7 +543,7 @@ private[spark] class ExecutorAllocationManager(
}
}
}
- stageIdToExecutorPlacementHints.put(stageId,
+ stageAttemptToExecutorPlacementHints.put(stageAttempt,
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
// Update the executor placement hints
@@ -546,20 +553,24 @@ private[spark] class ExecutorAllocationManager(
override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
val stageId = stageCompleted.stageInfo.stageId
+ val stageAttemptId = stageCompleted.stageInfo.attemptNumber()
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
- stageIdToNumTasks -= stageId
- stageIdToNumRunningTask -= stageId
- stageIdToNumSpeculativeTasks -= stageId
- stageIdToTaskIndices -= stageId
- stageIdToSpeculativeTaskIndices -= stageId
- stageIdToExecutorPlacementHints -= stageId
+ // do NOT remove stageAttempt from stageAttemptToNumRunningTasks,
+ // because the attempt may still have running tasks,
+ // even after another attempt for the stage is submitted.
+ stageAttemptToNumTasks -= stageAttempt
+ stageAttemptToNumSpeculativeTasks -= stageAttempt
+ stageAttemptToTaskIndices -= stageAttempt
+ stageAttemptToSpeculativeTaskIndices -= stageAttempt
+ stageAttemptToExecutorPlacementHints -= stageAttempt
// Update the executor placement hints
updateExecutorPlacementHints()
// If this is the last stage with pending tasks, mark the scheduler
queue as empty
// This is needed in case the stage is aborted for any reason
- if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty)
{
+ if (stageAttemptToNumTasks.isEmpty &&
stageAttemptToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
}
@@ -567,19 +578,19 @@ private[spark] class ExecutorAllocationManager(
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
+ val stageAttemptId = taskStart.stageAttemptId
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
val taskIndex = taskStart.taskInfo.index
-
allocationManager.synchronized {
- if (stageIdToNumRunningTask.contains(stageId)) {
- stageIdToNumRunningTask(stageId) += 1
- }
-
+ stageAttemptToNumRunningTask(stageAttempt) =
+ stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
// If this is the last pending task, mark the scheduler queue as empty
if (taskStart.taskInfo.speculative) {
- stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new
mutable.HashSet[Int]) +=
- taskIndex
+ stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
+ new mutable.HashSet[Int]) += taskIndex
} else {
- stageIdToTaskIndices.getOrElseUpdate(stageId, new
mutable.HashSet[Int]) += taskIndex
+ stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
+ new mutable.HashSet[Int]) += taskIndex
}
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
@@ -588,13 +599,17 @@ private[spark] class ExecutorAllocationManager(
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
+ val stageAttemptId = taskEnd.stageAttemptId
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ val taskIndex = taskEnd.taskInfo.index
allocationManager.synchronized {
- if (stageIdToNumRunningTask.contains(stageId)) {
- stageIdToNumRunningTask(stageId) -= 1
+ if (stageAttemptToNumRunningTask.contains(stageAttempt)) {
+ stageAttemptToNumRunningTask(stageAttempt) -= 1
+ if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
+ stageAttemptToNumRunningTask -= stageAttempt
+ }
}
-
// If the task failed, we expect it to be resubmitted later. To ensure
we have
// enough resources to run the resubmitted task, we need to mark the
scheduler
// as backlogged again if it's not already marked as such (SPARK-8366)
@@ -603,9 +618,9 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onSchedulerBacklogged()
}
if (taskEnd.taskInfo.speculative) {
- stageIdToSpeculativeTaskIndices.get(stageId).foreach
{_.remove(taskIndex)}
+ stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach
{_.remove(taskIndex)}
} else {
- stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+ stageAttemptToTaskIndices.get(stageAttempt).foreach
{_.remove(taskIndex)}
}
}
}
@@ -613,11 +628,12 @@ private[spark] class ExecutorAllocationManager(
override def onSpeculativeTaskSubmitted(speculativeTask:
SparkListenerSpeculativeTaskSubmitted)
: Unit = {
- val stageId = speculativeTask.stageId
-
+ val stageId = speculativeTask.stageId
+ val stageAttemptId = speculativeTask.stageAttemptId
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
- stageIdToNumSpeculativeTasks(stageId) =
- stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ stageAttemptToNumSpeculativeTasks(stageAttempt) =
+ stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
allocationManager.onSchedulerBacklogged()
}
}
@@ -629,14 +645,14 @@ private[spark] class ExecutorAllocationManager(
* Note: This is not thread-safe without the caller owning the
`allocationManager` lock.
*/
def pendingTasks(): Int = {
- stageIdToNumTasks.map { case (stageId, numTasks) =>
- numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
+ stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
+ numTasks -
stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
}.sum
}
def pendingSpeculativeTasks(): Int = {
- stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
- numTasks -
stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
+ stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
+ numTasks -
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
}.sum
}
@@ -646,9 +662,10 @@ private[spark] class ExecutorAllocationManager(
/**
* The number of tasks currently running across all stages.
+ * Include running-but-zombie stage attempts
*/
def totalRunningTasks(): Int = {
- stageIdToNumRunningTask.values.sum
+ stageAttemptToNumRunningTask.values.sum
}
/**
@@ -662,7 +679,7 @@ private[spark] class ExecutorAllocationManager(
def updateExecutorPlacementHints(): Unit = {
var localityAwareTasks = 0
val localityToCount = new mutable.HashMap[String, Int]()
- stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending,
localities) =>
+ stageAttemptToExecutorPlacementHints.values.foreach { case
(numTasksPending, localities) =>
localityAwareTasks += numTasksPending
localities.foreach { case (hostname, count) =>
val updatedCount = localityToCount.getOrElse(hostname, 0) + count
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index de57807..5072e61 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -933,7 +933,7 @@ private[spark] class DAGScheduler(
}
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit =
{
- listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
+ listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId,
task.stageAttemptId))
}
private[scheduler] def handleTaskSetFailed(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 1acfd90..666ce3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -52,7 +52,10 @@ case class SparkListenerTaskStart(stageId: Int,
stageAttemptId: Int, taskInfo: T
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends
SparkListenerEvent
@DeveloperApi
-case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends
SparkListenerEvent
+case class SparkListenerSpeculativeTaskSubmitted(
+ stageId: Int,
+ stageAttemptId: Int = 0)
+ extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskEnd(
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 2b75f2e..3ba33e3 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -254,14 +254,19 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a speculative task doesn't affect the target
- post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2",
true)))
+ post(SparkListenerTaskStart(1, 0, createTaskInfo(1, 0, "executor-2",
true)))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
}
- test("ignore task end events from completed stages") {
+ test("properly handle task end events from completed stages") {
val manager = createManager(createConf(0, 10, 0))
+
+ // We simulate having a stage fail, but with tasks still running. Then
another attempt for
+ // that stage is started, and we get task completions from the first stage
attempt. Make sure
+ // the value of `totalTasksRunning` is consistent as tasks finish from
both attempts (we count
+ // all running tasks, from the zombie & non-zombie attempts)
val stage = createStageInfo(0, 5)
post(SparkListenerStageSubmitted(stage))
val taskInfo1 = createTaskInfo(0, 0, "executor-1")
@@ -269,10 +274,27 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
post(SparkListenerTaskStart(0, 0, taskInfo1))
post(SparkListenerTaskStart(0, 0, taskInfo2))
+ // The tasks in the zombie attempt haven't completed yet, so we still
count them
post(SparkListenerStageCompleted(stage))
+ // There are still two tasks that belong to the zombie stage running.
+ assert(totalRunningTasks(manager) === 2)
+
+ // submit another attempt for the stage. We count completions from the
first zombie attempt
+ val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1)
+ post(SparkListenerStageSubmitted(stageAttempt1))
post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
- post(SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
+ assert(totalRunningTasks(manager) === 1)
+ val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1")
+ val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1")
+ post(SparkListenerTaskStart(0, 1, attemptTaskInfo1))
+ post(SparkListenerTaskStart(0, 1, attemptTaskInfo2))
+ assert(totalRunningTasks(manager) === 3)
+ post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo1, null))
+ assert(totalRunningTasks(manager) === 2)
+ post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo2, null))
+ assert(totalRunningTasks(manager) === 1)
+ post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo2, null))
assert(totalRunningTasks(manager) === 0)
}
@@ -1033,9 +1055,10 @@ private object ExecutorAllocationManagerSuite extends
PrivateMethodTester {
private def createStageInfo(
stageId: Int,
numTasks: Int,
- taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+ taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
+ attemptId: Int = 0
): StageInfo = {
- new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no
details",
+ new StageInfo(stageId, attemptId, "name", numTasks, Seq.empty, Seq.empty,
"no details",
taskLocalityPreferences = taskLocalityPreferences)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 38ec5a0..cb3b803 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -79,6 +79,12 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"),
+ // [SPARK-27630][CORE] Properly handle task end events from completed
stages
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.apply"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.copy"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted.this"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerSpeculativeTaskSubmitted$"),
+
// [SPARK-26632][Core] Separate Thread Configurations of Driver and
Executor
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.SparkTransportConf.fromSparkConf"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]