Repository: spark
Updated Branches:
  refs/heads/master 7f1720813 -> 7b971b91c


[SPARK-2571] Correctly report shuffle read metrics.

Currently, shuffle read metrics are incorrectly reported when stages have 
multiple shuffle dependencies (they are set to be the metrics from just one of 
the shuffle dependencies, rather than the accumulated metrics from all of the 
shuffle dependencies).  This fixes that problem, and should probably be 
back-ported to the 0.9 branch.

Thanks ryanra for discovering this problem!

cc rxin andrewor14

Author: Kay Ousterhout <[email protected]>

Closes #1476 from kayousterhout/join_bug and squashes the following commits:

0203a16 [Kay Ousterhout] Fix broken unit tests.
f463c2e [Kay Ousterhout] [SPARK-2571] Correctly report shuffle read metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b971b91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b971b91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b971b91

Branch: refs/heads/master
Commit: 7b971b91caeebda57f1506ffc4fd266a1b379290
Parents: 7f17208
Author: Kay Ousterhout <[email protected]>
Authored: Fri Jul 18 14:40:32 2014 -0700
Committer: Kay Ousterhout <[email protected]>
Committed: Fri Jul 18 14:40:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala | 20 +++++++++++++++++++-
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  5 +++--
 .../spark/scheduler/SparkListenerSuite.scala    |  4 ++--
 .../ui/jobs/JobProgressListenerSuite.scala      |  6 +-----
 .../apache/spark/util/JsonProtocolSuite.scala   |  2 +-
 6 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index ac73288..5d59e00 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
   /**
    * If this task reads from shuffle output, metrics on getting shuffle data 
will be collected here
    */
-  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+
+  def shuffleReadMetrics = _shuffleReadMetrics
 
   /**
    * If this task writes to shuffle output, metrics on the written shuffle 
data will be collected
@@ -87,6 +89,22 @@ class TaskMetrics extends Serializable {
    * Storage statuses of any blocks that have been updated as a result of this 
task.
    */
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
+
+  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for 
this task. */
+  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
+    _shuffleReadMetrics match {
+      case Some(existingMetrics) =>
+        existingMetrics.shuffleFinishTime = math.max(
+          existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
+        existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
+        existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
+        existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
+        existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
+        existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
+      case None =>
+        _shuffleReadMetrics = Some(newMetrics)
+    }
+  }
 }
 
 private[spark] object TaskMetrics {

http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index a932455..3795994 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging 
{
       shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
       shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
       shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
-      context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
+      context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
     })
 
     new InterruptibleIterator[T](context, completionIter)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 47eb44b..2ff8b25 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
     metrics.resultSerializationTime = (json \ "Result Serialization 
Time").extract[Long]
     metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
     metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
-    metrics.shuffleReadMetrics =
-      Utils.jsonOption(json \ "Shuffle Read 
Metrics").map(shuffleReadMetricsFromJson)
+    Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics =>
+      
metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
+    }
     metrics.shuffleWriteMetrics =
       Utils.jsonOption(json \ "Shuffle Write 
Metrics").map(shuffleWriteMetricsFromJson)
     metrics.inputMetrics =

http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 71f48e2..3b0b8e2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
         if (stageInfo.rddInfos.exists(_.name == d4.name)) {
           taskMetrics.shuffleReadMetrics should be ('defined)
           val sm = taskMetrics.shuffleReadMetrics.get
-          sm.totalBlocksFetched should be > (0)
-          sm.localBlocksFetched should be > (0)
+          sm.totalBlocksFetched should be (128)
+          sm.localBlocksFetched should be (128)
           sm.remoteBlocksFetched should be (0)
           sm.remoteBytesRead should be (0l)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index a855662..b52f818 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
 
     // finish this task, should get updated shuffleRead
     shuffleReadMetrics.remoteBytesRead = 1000
-    taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+    taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
     var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     var task = new ShuffleMapTask(0, null, null, 0, null)
@@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     assert(listener.stageIdToData.size === 1)
 
     // finish this task, should get updated duration
-    shuffleReadMetrics.remoteBytesRead = 1000
-    taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
     taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
@@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       .shuffleRead === 2000)
 
     // finish this task, should get updated duration
-    shuffleReadMetrics.remoteBytesRead = 1000
-    taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
     taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", 
TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b971b91/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 058d314..11f70a6 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
       sr.localBlocksFetched = e
       sr.fetchWaitTime = a + d
       sr.remoteBlocksFetched = f
-      t.shuffleReadMetrics = Some(sr)
+      t.updateShuffleReadMetrics(sr)
     }
     sw.shuffleBytesWritten = a + b + c
     sw.shuffleWriteTime = b + c + d

Reply via email to