Repository: spark
Updated Branches:
  refs/heads/master f6ff2a61d -> ecf30ee7e


http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 23cb690..dd4fd53 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, 
SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -43,6 +43,7 @@ import scala.language.postfixOps
 
 class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   with PrivateMethodTester {
+
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
+  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): 
BlockManager = {
+    new BlockManager(
+      name, actorSystem, master, serializer, maxMem, conf, securityMgr, 
mapOutputTracker)
+  }
+
   before {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, conf = conf,
-      securityManager = securityMgr)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
     this.actorSystem = actorSystem
-    conf.set("spark.driver.port", boundPort.toString)
-
-    master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new 
LiveListenerBus))),
-      conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
+    conf.set("spark.driver.port", boundPort.toString)
+    conf.set("spark.storage.unrollFraction", "0.4")
+    conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new 
LiveListenerBus))),
+      conf)
+
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
   }
@@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("master + 1 manager interaction") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("master + 2 managers interaction") {
-    store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf,
-      securityMgr, mapOutputTracker)
-    store2 = new BlockManager("exec2", actorSystem, master, new 
KryoSerializer(conf), 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000, "exec1")
+    store2 = makeBlockManager(2000, "exec2")
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a 
peer")
@@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("removing block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
@@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
-    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should 
equal 2000")
-    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " 
should <= 1200")
+    assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should 
equal 20000")
+    assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " 
should <= 12000")
     assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
     assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
     assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
@@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
-      memStatus._1 should equal (2000L)
-      memStatus._2 should equal (2000L)
+      memStatus._1 should equal (20000L)
+      memStatus._2 should equal (20000L)
     }
   }
 
   test("removing rdd") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
     store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
@@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("removing broadcast") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val driverStore = store
-    val executorStore = new BlockManager("executor", actorSystem, master, 
serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    val executorStore = makeBlockManager(2000, "executor")
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
 
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
 
   test("reregistration doesn't dead lock") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
 
@@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
         }
       }
       val t2 = new Thread {
@@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("correct BlockResult returned from get() calls") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr,
-      mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list1ForSizeEstimate = new ArrayBuffer[Any]
-    list1ForSizeEstimate ++= list1.iterator
-    val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
-    val list2 = List(new Array[Byte](50), new Array[Byte](100), new 
Array[Byte](150))
-    val list2ForSizeEstimate = new ArrayBuffer[Any]
-    list2ForSizeEstimate ++= list2.iterator
-    val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
-    store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster 
= true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](500), new Array[Byte](1000), new 
Array[Byte](1500))
+    val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
+    val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, 
tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("in-memory LRU storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
@@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
@@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
@@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should 
*not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 
testing jar.
     val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", 
false)
     if (tachyonUnitTestEnabled) {
-      store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(1200)
       val a1 = new Array[Byte](400)
       val a2 = new Array[Byte](400)
       val a3 = new Array[Byte](400)
@@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   }
 
   test("on-disk storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("disk and memory storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
-    val a4 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
+    val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
@@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
-    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3").isDefined, "list3 was not in store")
@@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(store.get("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
     assert(store.get("list2").isDefined, "list2 was not in store")
@@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    val list4 = List(new Array[Byte](200), new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
-    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = 
true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, 
tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
@@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     assert(store.get("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should 
drop out
-    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, 
tellMaster = true)
+    store.putIterator("list4", list4.iterator, 
StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("overly large block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 500, 
conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(5000)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory 
store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
   }
@@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
-      store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(20000, "exec1")
       store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
       store = null
 
       conf.set("spark.shuffle.compress", "false")
-      store = new BlockManager("exec2", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+      store = makeBlockManager(20000, "exec2")
+      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "true")
-      store = new BlockManager("exec3", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+      store = makeBlockManager(20000, "exec3")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "false")
-      store = new BlockManager("exec4", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, 
"broadcast_0 was compressed")
+      store = makeBlockManager(20000, "exec4")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, 
"broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
-      store = new BlockManager("exec5", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not 
compressed")
+      store = makeBlockManager(20000, "exec5")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not 
compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
-      store = new BlockManager("exec6", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was 
compressed")
+      store = makeBlockManager(20000, "exec6")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was 
compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager("exec7", actorSystem, master, serializer, 2000, 
conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle("other_block", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY)
-      assert(store.memoryStore.getSize("other_block") >= 1000, "other_block 
was compressed")
+      store = makeBlockManager(20000, "exec7")
+      store.putSingle("other_block", new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY)
+      assert(store.memoryStore.getSize("other_block") >= 10000, "other_block 
was compressed")
       store.stop()
       store = null
     } finally {
@@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
     assert(Arrays.equals(mappedAsArray, bytes))
     assert(Arrays.equals(notMappedAsArray, bytes))
   }
-  
+
   test("updated block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
-    val bigList = List.fill(8)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
+    val bigList = List.fill(8)(new Array[Byte](2000))
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 =
-      store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
     assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 =
-      store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
     assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 =
-      store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
       id match {
@@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
         case _ => fail("Updated block is neither list1 nor list3")
       }
     }
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory 
store")
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) 
while list4 is added
     val updatedBlocks4 =
-      store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
       id match {
@@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
         case _ => fail("Updated block is neither list2 nor list4")
       }
     }
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory 
store")
 
-    // No updated blocks - nothing is kicked out of memory because list5 is 
too big to be added
+    // No updated blocks - list5 is too big to fit in store and nothing is 
kicked out
     val updatedBlocks5 =
-      store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     assert(updatedBlocks5.size === 0)
-    assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list4").isDefined, "list4 was not in store")
-    assert(!store.get("list5").isDefined, "list5 was in store")
+
+    // memory store contains only list3 and list4
+    assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
+    assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory 
store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory 
store")
+    assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
+
+    // disk store contains only list2
+    assert(!store.diskStore.contains("list1"), "list1 was in disk store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(!store.diskStore.contains("list3"), "list3 was in disk store")
+    assert(!store.diskStore.contains("list4"), "list4 was in disk store")
+    assert(!store.diskStore.contains("list5"), "list5 was in disk store")
   }
 
   test("query block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster 
= true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 
and list6 remains.
-    store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
false)
-    store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster 
= false)
-    store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
false)
+    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
+    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
+    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("get matching blocks") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](10))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster 
= true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster 
= true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster 
= true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), 
askSlaves = false).size === 3)
     assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), 
askSlaves = false).size === 1)
 
     // insert some more blocks
-    store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
-    store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
-    store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
+    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
+    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), 
askSlaves = false).size === 1)
@@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
     assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
   }
+
+  test("reserve/release unroll memory") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    assert(memoryStore.currentUnrollMemory === 0)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Reserve
+    memoryStore.reserveUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 100)
+    memoryStore.reserveUnrollMemoryForThisThread(200)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 300)
+    memoryStore.reserveUnrollMemoryForThisThread(500)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800)
+    memoryStore.reserveUnrollMemoryForThisThread(1000000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+    // Release
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 700)
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+    // Reserve again
+    memoryStore.reserveUnrollMemoryForThisThread(4400)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
+    memoryStore.reserveUnrollMemoryForThisThread(20000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not 
granted
+    // Release again
+    memoryStore.releaseUnrollMemoryForThisThread(1000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
+    memoryStore.releaseUnrollMemoryForThisThread() // release all
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+  }
+
+  /**
+   * Verify the result of MemoryStore#unrollSafely is as expected.
+   */
+  private def verifyUnroll(
+      expected: Iterator[Any],
+      result: Either[Array[Any], Iterator[Any]],
+      shouldBeArray: Boolean): Unit = {
+    val actual: Iterator[Any] = result match {
+      case Left(arr: Array[Any]) =>
+        assert(shouldBeArray, "expected iterator from unroll!")
+        arr.iterator
+      case Right(it: Iterator[Any]) =>
+        assert(!shouldBeArray, "expected array from unroll!")
+        it
+      case _ =>
+        fail("unroll returned neither an iterator nor an array...")
+    }
+    expected.zip(actual).foreach { case (e, a) =>
+      assert(e === a, "unroll did not return original values!")
+    }
+  }
+
+  test("safely unroll blocks") {
+    store = makeBlockManager(12000)
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    val memoryStore = store.memoryStore
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with all the space in the world. This should succeed and return 
an array.
+    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, 
droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with not enough space. This should succeed after kicking out 
someBlock1.
+    store.putIterator("someBlock1", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, 
droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
+    droppedBlocks.clear()
+
+    // Unroll huge block with not enough space. Even after ensuring free space 
of 12000 * 0.4 =
+    // 4800 bytes, there is still not enough room to unroll this block. This 
returns an iterator.
+    // In the mean time, however, we kicked out someBlock2 before giving up.
+    store.putIterator("someBlock3", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, 
droppedBlocks)
+    verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an 
iterator
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
+    droppedBlocks.clear()
+  }
+
+  test("safely unroll blocks through putIterator") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, 
returnValues = true)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, 
returnValues = true)
+    assert(memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(result1.size > 0) // unroll was successful
+    assert(result2.size > 0)
+    assert(result1.data.isLeft) // unroll did not drop this block to disk
+    assert(result2.data.isLeft)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Re-put these two blocks so block manager knows about them too. 
Otherwise, block manager
+    // would not know how to drop them from memory later.
+    memoryStore.remove("b1")
+    memoryStore.remove("b2")
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, 
returnValues = true)
+    assert(result3.size > 0)
+    assert(result3.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, memOnly)
+
+    // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, 
returnValues = true)
+    assert(result4.size === 0) // unroll was unsuccessful
+    assert(result4.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an 
iterator
+  }
+
+  /**
+   * This test is essentially identical to the preceding one, except that it 
uses MEMORY_AND_DISK.
+   */
+  test("safely unroll blocks through putIterator (disk)") {
+    store = makeBlockManager(12000)
+    val memAndDisk = StorageLevel.MEMORY_AND_DISK
+    val memoryStore = store.memoryStore
+    val diskStore = store.diskStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
+    // Memory store should contain b2 and b3, while disk store should contain 
only b1
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, 
returnValues = true)
+    assert(result3.size > 0)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(diskStore.contains("b1"))
+    assert(!diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll huge block with not enough space. This should fail and drop the 
new block to disk
+    // directly in addition to kicking out b2 in the process. Memory store 
should contain only
+    // b3, while disk store should contain b1, b2 and b4.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, 
returnValues = true)
+    assert(result4.size > 0)
+    assert(result4.data.isRight) // unroll returned bytes from disk
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(diskStore.contains("b1"))
+    assert(diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    assert(diskStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an 
iterator
+  }
+
+  test("multiple unrolls by the same thread") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // All unroll memory used is released because unrollSafely returned an 
array
+    memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll memory is not released because unrollSafely returned an iterator
+    // that still depends on the underlying vector used in the process
+    memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB3 > 0)
+
+    // The unroll memory owned by this thread builds on top of its value after 
the previous unrolls
+    memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+    // ... but only to a certain extent (until we run out of free space to 
grant new unroll memory)
+    memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
deleted file mode 100644
index 93f0c6a..0000000
--- 
a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, 
SizeTrackingAppendOnlyMap}
-
-class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
-  val NORMAL_ERROR = 0.20
-  val HIGH_ERROR = 0.30
-
-  test("fixed size insertions") {
-    testWith[Int, Long](10000, i => (i, i.toLong))
-    testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
-    testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
-  }
-
-  test("variable size insertions") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[Int, String](10000, i => (i, randString(0, 10)))
-    testWith[Int, String](10000, i => (i, randString(0, 100)))
-    testWith[Int, String](10000, i => (i, randString(90, 100)))
-  }
-
-  test("updates") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[String, Int](10000, i => (randString(0, 10000), i))
-  }
-
-  def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
-    val map = new SizeTrackingAppendOnlyMap[K, V]()
-    for (i <- 0 until numElements) {
-      val (k, v) = makeElement(i)
-      map(k) = v
-      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else 
NORMAL_ERROR)
-    }
-  }
-
-  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
-    val betterEstimatedSize = SizeEstimator.estimate(obj)
-    assert(betterEstimatedSize * (1 - error) < estimatedSize,
-      s"Estimated size $estimatedSize was less than expected size 
$betterEstimatedSize")
-    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
-      s"Estimated size $estimatedSize was greater than expected size 
$betterEstimatedSize")
-  }
-}
-
-object SizeTrackingAppendOnlyMapSuite {
-  // Speed test, for reproducibility of results.
-  // These could be highly non-deterministic in general, however.
-  // Results:
-  // AppendOnlyMap:   31 ms
-  // SizeTracker:     54 ms
-  // SizeEstimator: 1500 ms
-  def main(args: Array[String]) {
-    val numElements = 100000
-
-    val baseTimes = for (i <- 0 until 10) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-      }
-    }
-
-    val sampledTimes = for (i <- 0 until 10) yield time {
-      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        map.estimateSize()
-      }
-    }
-
-    val unsampledTimes = for (i <- 0 until 3) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        SizeEstimator.estimate(map)
-      }
-    }
-
-    println("Base: " + baseTimes)
-    println("SizeTracker (sampled): " + sampledTimes)
-    println("SizeEstimator (unsampled): " + unsampledTimes)
-  }
-
-  def time(f: => Unit): Long = {
-    val start = System.currentTimeMillis()
-    f
-    System.currentTimeMillis() - start
-  }
-
-  private class LargeDummyClass {
-    val arr = new Array[Int](100)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
new file mode 100644
index 0000000..1f33967
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class SizeTrackerSuite extends FunSuite {
+  val NORMAL_ERROR = 0.20
+  val HIGH_ERROR = 0.30
+
+  import SizeTrackerSuite._
+
+  test("vector fixed size insertions") {
+    testVector[Long](10000, i => i.toLong)
+    testVector[(Long, Long)](10000, i => (i.toLong, i.toLong))
+    testVector[LargeDummyClass](10000, i => new LargeDummyClass)
+  }
+
+  test("vector variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testVector[String](10000, i => randString(0, 10))
+    testVector[String](10000, i => randString(0, 100))
+    testVector[String](10000, i => randString(90, 100))
+  }
+
+  test("map fixed size insertions") {
+    testMap[Int, Long](10000, i => (i, i.toLong))
+    testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+    testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass))
+  }
+
+  test("map variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[Int, String](10000, i => (i, randString(0, 10)))
+    testMap[Int, String](10000, i => (i, randString(0, 100)))
+    testMap[Int, String](10000, i => (i, randString(90, 100)))
+  }
+
+  test("map updates") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[String, Int](10000, i => (randString(0, 10000), i))
+  }
+
+  def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) {
+    val vector = new SizeTrackingVector[T]
+    for (i <- 0 until numElements) {
+      val item = makeElement(i)
+      vector += item
+      expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR 
else NORMAL_ERROR)
+    }
+  }
+
+  def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+    val map = new SizeTrackingAppendOnlyMap[K, V]
+    for (i <- 0 until numElements) {
+      val (k, v) = makeElement(i)
+      map(k) = v
+      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else 
NORMAL_ERROR)
+    }
+  }
+
+  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+    val betterEstimatedSize = SizeEstimator.estimate(obj)
+    assert(betterEstimatedSize * (1 - error) < estimatedSize,
+      s"Estimated size $estimatedSize was less than expected size 
$betterEstimatedSize")
+    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+      s"Estimated size $estimatedSize was greater than expected size 
$betterEstimatedSize")
+  }
+}
+
+private object SizeTrackerSuite {
+
+  /**
+   * Run speed tests for size tracking collections.
+   */
+  def main(args: Array[String]): Unit = {
+    if (args.size < 1) {
+      println("Usage: SizeTrackerSuite [num elements]")
+      System.exit(1)
+    }
+    val numElements = args(0).toInt
+    vectorSpeedTest(numElements)
+    mapSpeedTest(numElements)
+  }
+
+  /**
+   * Speed test for SizeTrackingVector.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   PrimitiveVector  15 ms
+   *   SizeTracker      51 ms
+   *   SizeEstimator    2000 ms
+   */
+  def vectorSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val vector = new SizeTrackingVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        vector.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        SizeEstimator.estimate(vector)
+      }
+    }
+    printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, 
unsampledTimes)
+  }
+
+  /**
+   * Speed test for SizeTrackingAppendOnlyMap.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   AppendOnlyMap  30 ms
+   *   SizeTracker    41 ms
+   *   SizeEstimator  1666 ms
+   */
+  def mapSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        map.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        SizeEstimator.estimate(map)
+      }
+    }
+    printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, 
unsampledTimes)
+  }
+
+  def printSpeedTestResult(
+      testName: String,
+      baseTimes: Seq[Long],
+      sampledTimes: Seq[Long],
+      unsampledTimes: Seq[Long]): Unit = {
+    println(s"Average times for $testName (ms):")
+    println("  Base - " + averageTime(baseTimes))
+    println("  SizeTracker (sampled) - " + averageTime(sampledTimes))
+    println("  SizeEstimator (unsampled) - " + averageTime(unsampledTimes))
+    println()
+  }
+
+  def time(f: => Unit): Long = {
+    val start = System.currentTimeMillis()
+    f
+    System.currentTimeMillis() - start
+  }
+
+  def averageTime(v: Seq[Long]): Long = {
+    v.sum / v.size
+  }
+
+  private class LargeDummyClass {
+    val arr = new Array[Int](100)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 46e3dd9..2e6c85c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -481,6 +481,15 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.storage.unrollFraction</code></td>
+  <td>0.2</td>
+  <td>
+    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling 
blocks in memory.
+    This is dynamically allocated by dropping existing blocks when there is 
not enough free
+    storage space to unroll the new block in its entirety.
+  </td>
+</tr>
+<tr>
   <td><code>spark.tachyonStore.baseDir</code></td>
   <td>System.getProperty("java.io.tmpdir")</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e9220db..5ff88f0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._
  * 
MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
  */
 object MimaExcludes {
-
     def excludes(version: String) =
       version match {
         case v if v.startsWith("1.1") =>
@@ -63,6 +62,15 @@ object MimaExcludes {
               "org.apache.spark.storage.MemoryStore.Entry")
           ) ++
           Seq(
+            // Renamed putValues -> putArray + putIterator
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.MemoryStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.DiskStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.TachyonStore.putValues")
+          ) ++
+          Seq(
             
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
           ) ++
           Seq( // Ignore some private methods in ALS.

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf30ee7/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index ce8316b..d934b9c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
-      storageLevel, tellMaster = true)
+    blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, 
tellMaster = true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - 
time)  + " ms")
     reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
   }
@@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+    blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = 
true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - 
time)  + " ms")
     reportPushedBlock(blockId, -1, optionalMetadata)
   }

Reply via email to