Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bf3f6d2f1 -> a447cd888


[SPARK-17465][SPARK CORE] Inappropriate memory management in 
`org.apache.spark.storage.MemoryStore` may lead to memory leak

## What changes were proposed in this pull request?

The expression like `if (memoryMap(taskAttemptId) == 0) 
memoryMap.remove(taskAttemptId)` in method `releaseUnrollMemoryForThisTask` and 
`releasePendingUnrollMemoryForThisTask` should be called after release memory 
operation, whatever `memoryToRelease` is > 0 or not.

If the memory of a task has been set to 0 when calling a 
`releaseUnrollMemoryForThisTask` or a `releasePendingUnrollMemoryForThisTask` 
method, the key in the memory map corresponding to that task will never be 
removed from the hash map.

See the details in 
[SPARK-17465](https://issues.apache.org/jira/browse/SPARK-17465).

Author: Xing SHI <[email protected]>

Closes #15022 from saturday-shi/SPARK-17465.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a447cd88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a447cd88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a447cd88

Branch: refs/heads/branch-1.6
Commit: a447cd88897bc3d76eee0e8757e6545019704f30
Parents: bf3f6d2
Author: Xing SHI <[email protected]>
Authored: Wed Sep 14 13:46:46 2016 -0700
Committer: Josh Rosen <[email protected]>
Committed: Wed Sep 14 13:46:46 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/Task.scala    |  1 +
 .../scala/org/apache/spark/storage/MemoryStore.scala    | 12 ++++++------
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index c7b1199..2f4225e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -104,6 +104,7 @@ private[spark] abstract class Task[T](
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
           
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
           // Notify any tasks waiting for execution memory to be freed to wake 
up and try to
           // acquire memory again. This makes impossible the scenario where a 
task sleeps forever
           // because there are no other tasks left to notify it. Since this is 
safe to do but may

http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index aed0da9..1113160 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
         val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
           unrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (unrollMemoryMap(taskAttemptId) == 0) {
-            unrollMemoryMap.remove(taskAttemptId)
-          }
           memoryManager.releaseUnrollMemory(memoryToRelease)
         }
+        if (unrollMemoryMap(taskAttemptId) == 0) {
+          unrollMemoryMap.remove(taskAttemptId)
+        }
       }
     }
   }
@@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
         val memoryToRelease = math.min(memory, 
pendingUnrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
           pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
-            pendingUnrollMemoryMap.remove(taskAttemptId)
-          }
           memoryManager.releaseUnrollMemory(memoryToRelease)
         }
+        if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
+          pendingUnrollMemoryMap.remove(taskAttemptId)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to