Repository: spark Updated Branches: refs/heads/branch-1.6 bf3f6d2f1 -> a447cd888
[SPARK-17465][SPARK CORE] Inappropriate memory management in `org.apache.spark.storage.MemoryStore` may lead to memory leak ## What changes were proposed in this pull request? The expression like `if (memoryMap(taskAttemptId) == 0) memoryMap.remove(taskAttemptId)` in method `releaseUnrollMemoryForThisTask` and `releasePendingUnrollMemoryForThisTask` should be called after release memory operation, whatever `memoryToRelease` is > 0 or not. If the memory of a task has been set to 0 when calling a `releaseUnrollMemoryForThisTask` or a `releasePendingUnrollMemoryForThisTask` method, the key in the memory map corresponding to that task will never be removed from the hash map. See the details in [SPARK-17465](https://issues.apache.org/jira/browse/SPARK-17465). Author: Xing SHI <[email protected]> Closes #15022 from saturday-shi/SPARK-17465. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a447cd88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a447cd88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a447cd88 Branch: refs/heads/branch-1.6 Commit: a447cd88897bc3d76eee0e8757e6545019704f30 Parents: bf3f6d2 Author: Xing SHI <[email protected]> Authored: Wed Sep 14 13:46:46 2016 -0700 Committer: Josh Rosen <[email protected]> Committed: Wed Sep 14 13:46:46 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/scheduler/Task.scala | 1 + .../scala/org/apache/spark/storage/MemoryStore.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index c7b1199..2f4225e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -104,6 +104,7 @@ private[spark] abstract class Task[T]( Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() + SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask() // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index aed0da9..1113160 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { unrollMemoryMap(taskAttemptId) -= memoryToRelease - if (unrollMemoryMap(taskAttemptId) == 0) { - unrollMemoryMap.remove(taskAttemptId) - } memoryManager.releaseUnrollMemory(memoryToRelease) } + if (unrollMemoryMap(taskAttemptId) == 0) { + unrollMemoryMap.remove(taskAttemptId) + } } } } @@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease - if (pendingUnrollMemoryMap(taskAttemptId) == 0) { - pendingUnrollMemoryMap.remove(taskAttemptId) - } memoryManager.releaseUnrollMemory(memoryToRelease) } + if (pendingUnrollMemoryMap(taskAttemptId) == 0) { + pendingUnrollMemoryMap.remove(taskAttemptId) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
