Repository: spark
Updated Branches:
  refs/heads/master a64746677 -> 95aef660b


[SPARK-20205][CORE] Make sure StageInfo is updated before sending event.

The DAGScheduler was sending a "stage submitted" event before it properly
updated the event's information. This meant that a listener (e.g. the
even logging listener) could record wrong information about the event.

This change sets the stage's submission time before the event is submitted,
when there are tasks to be executed in the stage.

Tested with existing unit tests.

Author: Marcelo Vanzin <[email protected]>

Closes #17925 from vanzin/SPARK-20205.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95aef660
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95aef660
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95aef660

Branch: refs/heads/master
Commit: 95aef660b73ec931e746d1ec8ae7848762ba0d7c
Parents: a647466
Author: Marcelo Vanzin <[email protected]>
Authored: Wed May 24 16:57:17 2017 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed May 24 16:57:17 2017 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95aef660/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 875acc3..ab2255f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -983,6 +983,13 @@ class DAGScheduler(
     }
 
     stage.makeNewStageAttempt(partitionsToCompute.size, 
taskIdToLocations.values.toSeq)
+
+    // If there are tasks to execute, record the submission time of the stage. 
Otherwise,
+    // post the even without the submission time, which indicates that this 
stage was
+    // skipped.
+    if (partitionsToCompute.nonEmpty) {
+      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
+    }
     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it 
multiple times.
@@ -1054,7 +1061,6 @@ class DAGScheduler(
         s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
-      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark
       // the stage as completed here in case there are no tasks to run


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to