Repository: spark Updated Branches: refs/heads/master a64746677 -> 95aef660b
[SPARK-20205][CORE] Make sure StageInfo is updated before sending event. The DAGScheduler was sending a "stage submitted" event before it properly updated the event's information. This meant that a listener (e.g. the even logging listener) could record wrong information about the event. This change sets the stage's submission time before the event is submitted, when there are tasks to be executed in the stage. Tested with existing unit tests. Author: Marcelo Vanzin <[email protected]> Closes #17925 from vanzin/SPARK-20205. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95aef660 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95aef660 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95aef660 Branch: refs/heads/master Commit: 95aef660b73ec931e746d1ec8ae7848762ba0d7c Parents: a647466 Author: Marcelo Vanzin <[email protected]> Authored: Wed May 24 16:57:17 2017 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Wed May 24 16:57:17 2017 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/95aef660/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 875acc3..ab2255f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -983,6 +983,13 @@ class DAGScheduler( } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) + + // If there are tasks to execute, record the submission time of the stage. Otherwise, + // post the even without the submission time, which indicates that this stage was + // skipped. + if (partitionsToCompute.nonEmpty) { + stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) + } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. @@ -1054,7 +1061,6 @@ class DAGScheduler( s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) - stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
