Repository: spark
Updated Branches:
  refs/heads/master 4c7243e10 -> 25921110f


[SPARK-2609] Log thread ID when spilling ExternalAppendOnlyMap

It's useful to know whether one thread is constantly spilling or multiple 
threads are spilling relatively infrequently. Right now everything looks a 
little jumbled and we can't tell which lines belong to the same thread. For 
instance:

```
06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (194 
times so far)
06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (198 
times so far)
06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (198 
times so far)
06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 10 MB to disk (197 
times so far)
06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 9 MB to disk (45 
times so far)
06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 23 MB to disk (198 
times so far)
06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 38 MB to disk (25 
times so far)
06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 161 MB to disk (25 
times so far)
06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 0 MB to disk (199 
times so far)
06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (166 
times so far)
06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (199 
times so far)
06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (200 
times so far)
```

Author: Andrew Or <[email protected]>

Closes #1517 from andrewor14/external-log and squashes the following commits:

90e48bb [Andrew Or] Log thread ID when spilling


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25921110
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25921110
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25921110

Branch: refs/heads/master
Commit: 25921110fcd5afe568bf0d25fccd232787af7911
Parents: 4c7243e
Author: Andrew Or <[email protected]>
Authored: Wed Jul 23 10:31:45 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Wed Jul 23 10:31:45 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25921110/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 71ab2a3..be8f652 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -106,6 +106,7 @@ class ExternalAppendOnlyMap[K, V, C](
   private val fileBufferSize = 
sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
   private val keyComparator = new HashComparator[K]
   private val ser = serializer.newInstance()
+  private val threadId = Thread.currentThread().getId
 
   /**
    * Insert the given key and value into the map.
@@ -128,7 +129,6 @@ class ExternalAppendOnlyMap[K, V, C](
       // Atomically check whether there is sufficient memory in the global 
pool for
       // this map to grow and, if possible, allocate the required amount
       shuffleMemoryMap.synchronized {
-        val threadId = Thread.currentThread().getId
         val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
         val availableMemory = maxMemoryThreshold -
           (shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
@@ -153,8 +153,8 @@ class ExternalAppendOnlyMap[K, V, C](
    */
   private def spill(mapSize: Long) {
     spillCount += 1
-    logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
-      .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" 
else ""))
+    logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s 
so far)"
+      .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 
1) "s" else ""))
     val (blockId, file) = diskBlockManager.createTempBlock()
     var writer = blockManager.getDiskWriter(blockId, file, serializer, 
fileBufferSize)
     var objectsWritten = 0

Reply via email to