Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e4ea2390a -> e2a288cc3


[SPARK-9946] [SPARK-9589] [SQL] fix NPE and thread-safety in TaskMemoryManager

Currently, we access the `page.pageNumer` after it's freed, that could be 
modified by other thread, cause NPE.

The same TaskMemoryManager could be used by multiple threads (for example, 
Python UDF and TransportScript), so it should be thread safe to allocate/free 
memory/page. The underlying Bitset and HashSet are not thread safe, we should 
put them inside a synchronized block.

cc JoshRosen

Author: Davies Liu <[email protected]>

Closes #8177 from davies/memory_manager.

(cherry picked from commit 3bc55287220b1248e935bf817d880ff176ad4d3b)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2a288cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2a288cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2a288cc

Branch: refs/heads/branch-1.5
Commit: e2a288cc35505bdaec0e8d871933e96494ab7908
Parents: e4ea239
Author: Davies Liu <[email protected]>
Authored: Fri Aug 14 12:32:35 2015 -0700
Committer: Davies Liu <[email protected]>
Committed: Fri Aug 14 12:32:44 2015 -0700

----------------------------------------------------------------------
 .../spark/unsafe/memory/TaskMemoryManager.java  | 44 +++++++++++++-------
 1 file changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2a288cc/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
index 358bb37..ca70d7f 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
@@ -144,14 +144,16 @@ public class TaskMemoryManager {
   public void freePage(MemoryBlock page) {
     assert (page.pageNumber != -1) :
       "Called freePage() on memory that wasn't allocated with allocatePage()";
-    executorMemoryManager.free(page);
+    assert(allocatedPages.get(page.pageNumber));
+    pageTable[page.pageNumber] = null;
     synchronized (this) {
       allocatedPages.clear(page.pageNumber);
     }
-    pageTable[page.pageNumber] = null;
     if (logger.isTraceEnabled()) {
       logger.trace("Freed page number {} ({} bytes)", page.pageNumber, 
page.size());
     }
+    // Cannot access a page once it's freed.
+    executorMemoryManager.free(page);
   }
 
   /**
@@ -166,7 +168,9 @@ public class TaskMemoryManager {
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
     assert(size > 0) : "Size must be positive, but got " + size;
     final MemoryBlock memory = executorMemoryManager.allocate(size);
-    allocatedNonPageMemory.add(memory);
+    synchronized(allocatedNonPageMemory) {
+      allocatedNonPageMemory.add(memory);
+    }
     return memory;
   }
 
@@ -176,8 +180,10 @@ public class TaskMemoryManager {
   public void free(MemoryBlock memory) {
     assert (memory.pageNumber == -1) : "Should call freePage() for pages, not 
free()";
     executorMemoryManager.free(memory);
-    final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
-    assert (!wasAlreadyRemoved) : "Called free() on memory that was already 
freed!";
+    synchronized(allocatedNonPageMemory) {
+      final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
+      assert (!wasAlreadyRemoved) : "Called free() on memory that was already 
freed!";
+    }
   }
 
   /**
@@ -223,9 +229,10 @@ public class TaskMemoryManager {
     if (inHeap) {
       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
-      final Object page = pageTable[pageNumber].getBaseObject();
+      final MemoryBlock page = pageTable[pageNumber];
       assert (page != null);
-      return page;
+      assert (page.getBaseObject() != null);
+      return page.getBaseObject();
     } else {
       return null;
     }
@@ -244,7 +251,9 @@ public class TaskMemoryManager {
       // converted the absolute address into a relative address. Here, we 
invert that operation:
       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
-      return pageTable[pageNumber].getBaseOffset() + offsetInPage;
+      final MemoryBlock page = pageTable[pageNumber];
+      assert (page != null);
+      return page.getBaseOffset() + offsetInPage;
     }
   }
 
@@ -260,14 +269,17 @@ public class TaskMemoryManager {
         freePage(page);
       }
     }
-    final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
-    while (iter.hasNext()) {
-      final MemoryBlock memory = iter.next();
-      freedBytes += memory.size();
-      // We don't call free() here because that calls Set.remove, which would 
lead to a
-      // ConcurrentModificationException here.
-      executorMemoryManager.free(memory);
-      iter.remove();
+
+    synchronized (allocatedNonPageMemory) {
+      final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
+      while (iter.hasNext()) {
+        final MemoryBlock memory = iter.next();
+        freedBytes += memory.size();
+        // We don't call free() here because that calls Set.remove, which 
would lead to a
+        // ConcurrentModificationException here.
+        executorMemoryManager.free(memory);
+        iter.remove();
+      }
     }
     return freedBytes;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to