Repository: spark
Updated Branches:
  refs/heads/branch-1.5 c34fdaf55 -> 0e439c29d


[SPARK-9700] Pick default page size more intelligently.

Previously, we use 64MB as the default page size, which was way too big for a 
lot of Spark applications (especially for single node).

This patch changes it so that the default page size, if unset by the user, is 
determined by the number of cores available and the total execution memory 
available.

Author: Reynold Xin <[email protected]>

Closes #8012 from rxin/pagesize and squashes the following commits:

16f4756 [Reynold Xin] Fixed failing test.
5afd570 [Reynold Xin] private...
0d5fb98 [Reynold Xin] Update default value.
674a6cd [Reynold Xin] Address review feedback.
dc00e05 [Reynold Xin] Merge with master.
73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.

(cherry picked from commit 4309262ec9146d7158ee9957a128bb152289d557)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e439c29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e439c29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e439c29

Branch: refs/heads/branch-1.5
Commit: 0e439c29d65ca94974440e73254a306c98ea8d6f
Parents: c34fdaf
Author: Reynold Xin <[email protected]>
Authored: Thu Aug 6 23:18:29 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu Aug 6 23:18:37 2015 -0700

----------------------------------------------------------------------
 R/run-tests.sh                                  |   2 +-
 .../unsafe/UnsafeShuffleExternalSorter.java     |   3 +-
 .../spark/unsafe/map/BytesToBytesMap.java       |   8 +-
 .../unsafe/sort/UnsafeExternalSorter.java       |   1 -
 .../main/scala/org/apache/spark/SparkConf.scala |   7 +
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/shuffle/ShuffleMemoryManager.scala    |  53 ++++++-
 .../unsafe/UnsafeShuffleWriterSuite.java        |   5 +-
 .../map/AbstractBytesToBytesMapSuite.java       |   6 +-
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |   4 +-
 .../shuffle/ShuffleMemoryManagerSuite.scala     |  14 +-
 python/pyspark/java_gateway.py                  |   1 -
 .../aggregate/TungstenAggregationIterator.scala |   2 +-
 .../sql/execution/joins/HashedRelation.scala    |  16 +-
 .../org/apache/spark/sql/execution/sort.scala   |   4 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 159 -------------------
 .../sql/parquet/ParquetTypesConverter.scala     | 159 +++++++++++++++++++
 .../execution/TestShuffleMemoryManager.scala    |   2 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   1 -
 .../spark/unsafe/array/ByteArrayMethods.java    |   6 +
 21 files changed, 252 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/R/run-tests.sh
----------------------------------------------------------------------
diff --git a/R/run-tests.sh b/R/run-tests.sh
index 18a1e13..e82ad0b 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
 LOGFILE=$FWDIR/unit-tests.out
 rm -f $LOGFILE
 
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m 
--driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" 
$FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options 
"-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 
2>&1 | tee -a $LOGFILE
 FAILED=$((PIPESTATUS[0]||$FAILED))
 
 if [[ $FAILED != 0 ]]; then

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index bf4eaa5..f6e0913 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -115,8 +115,7 @@ final class UnsafeShuffleExternalSorter {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.pageSizeBytes = (int) Math.min(
-      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
-      conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
+      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
shuffleMemoryManager.pageSizeBytes());
     this.maxRecordSizeBytes = pageSizeBytes - 4;
     this.writeMetrics = writeMetrics;
     initializeForWriting();

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ac3736..0636ae7 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -642,7 +642,7 @@ public final class BytesToBytesMap {
   private void allocate(int capacity) {
     assert (capacity >= 0);
     // The capacity needs to be divisible by 64 so that our bit set can be 
sized properly
-    capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 
64);
+    capacity = Math.max((int) Math.min(MAX_CAPACITY, 
ByteArrayMethods.nextPowerOf2(capacity)), 64);
     assert (capacity <= MAX_CAPACITY);
     longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 
2]));
     bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
@@ -770,10 +770,4 @@ public final class BytesToBytesMap {
       timeSpentResizingNs += System.nanoTime() - resizeStartTime;
     }
   }
-
-  /** Returns the next number greater or equal num that is power of 2. */
-  private static long nextPowerOf2(long num) {
-    final long highBit = Long.highestOneBit(num);
-    return (highBit == num) ? num : highBit << 1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4c54ba4..5ebbf9b 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -127,7 +127,6 @@ public final class UnsafeExternalSorter {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility for 
units
     // this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.fileBufferSizeBytes = 32 * 1024;
-    // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", 
"64m");
     this.pageSizeBytes = pageSizeBytes;
     this.writeMetrics = new ShuffleWriteMetrics();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 08bab4b..8ff154f 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -250,6 +250,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   }
 
   /**
+   * Get a size parameter as bytes, falling back to a default if not set.
+   */
+  def getSizeAsBytes(key: String, defaultValue: Long): Long = {
+    Utils.byteStringAsBytes(get(key, defaultValue + "B"))
+  }
+
+  /**
    * Get a size parameter as Kibibytes; throws a NoSuchElementException if 
it's not set. If no
    * suffix is provided then Kibibytes are assumed.
    * @throws NoSuchElementException

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0c07053..5662686 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -629,7 +629,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * [[org.apache.spark.SparkContext.setLocalProperty]].
    */
   def getLocalProperty(key: String): String =
-    Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+    Option(localProperties.get).map(_.getProperty(key)).orNull
 
   /** Set a human readable description of the current job. */
   def setJobDescription(value: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index adfece4..a796e72 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -324,7 +324,7 @@ object SparkEnv extends Logging {
     val shuffleMgrClass = 
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
-    val shuffleMemoryManager = new ShuffleMemoryManager(conf)
+    val shuffleMemoryManager = ShuffleMemoryManager.create(conf, 
numUsableCores)
 
     val blockTransferService =
       conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase 
match {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index e3d229c..8c3a726 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,6 +19,9 @@ package org.apache.spark.shuffle
 
 import scala.collection.mutable
 
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
 
 /**
@@ -34,11 +37,19 @@ import org.apache.spark.{Logging, SparkException, 
SparkConf, TaskContext}
  * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever
  * this set changes. This is all done by synchronizing access on "this" to 
mutate state and using
  * wait() and notifyAll() to signal changes.
+ *
+ * Use `ShuffleMemoryManager.create()` factory method to create a new instance.
+ *
+ * @param maxMemory total amount of memory available for execution, in bytes.
+ * @param pageSizeBytes number of bytes for each page, by default.
  */
-private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
-  private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId 
-> memory bytes
+private[spark]
+class ShuffleMemoryManager protected (
+    val maxMemory: Long,
+    val pageSizeBytes: Long)
+  extends Logging {
 
-  def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
+  private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId 
-> memory bytes
 
   private def currentTaskAttemptId(): Long = {
     // In case this is called on the driver, return an invalid task attempt id.
@@ -124,15 +135,49 @@ private[spark] class ShuffleMemoryManager(maxMemory: 
Long) extends Logging {
   }
 }
 
+
 private[spark] object ShuffleMemoryManager {
+
+  def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = {
+    val maxMemory = ShuffleMemoryManager.getMaxMemory(conf)
+    val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores)
+    new ShuffleMemoryManager(maxMemory, pageSize)
+  }
+
+  def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
+    new ShuffleMemoryManager(maxMemory, pageSizeBytes)
+  }
+
+  @VisibleForTesting
+  def createForTesting(maxMemory: Long): ShuffleMemoryManager = {
+    new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024)
+  }
+
   /**
    * Figure out the shuffle memory limit from a SparkConf. We currently have 
both a fraction
    * of the memory pool and a safety factor since collections can sometimes 
grow bigger than
    * the size we target before we estimate their sizes again.
    */
-  def getMaxMemory(conf: SparkConf): Long = {
+  private def getMaxMemory(conf: SparkConf): Long = {
     val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
     val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
     (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
+
+  /**
+   * Sets the page size, in bytes.
+   *
+   * If user didn't explicitly set "spark.buffer.pageSize", we figure out the 
default value
+   * by looking at the number of cores available to the process, and the total 
amount of memory,
+   * and then divide it by a factor of safety.
+   */
+  private def getPageSize(conf: SparkConf, maxMemory: Long, numCores: Int): 
Long = {
+    val minPageSize = 1L * 1024 * 1024   // 1MB
+    val maxPageSize = 64L * minPageSize  // 64MB
+    val cores = if (numCores > 0) numCores else 
Runtime.getRuntime.availableProcessors()
+    val safetyFactor = 8
+    val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
+    val default = math.min(maxPageSize, math.max(minPageSize, size))
+    conf.getSizeAsBytes("spark.buffer.pageSize", default)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 98c32bb..c68354b 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -115,6 +115,7 @@ public class UnsafeShuffleWriterSuite {
     taskMetrics = new TaskMetrics();
 
     when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
+    when(shuffleMemoryManager.pageSizeBytes()).thenReturn(128L * 1024 * 1024);
 
     when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
     when(blockManager.getDiskWriter(
@@ -549,14 +550,14 @@ public class UnsafeShuffleWriterSuite {
     final long recordLengthBytes = 8;
     final long pageSizeBytes = 256;
     final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
-    final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", 
pageSizeBytes + "b");
+    when(shuffleMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes);
     final UnsafeShuffleWriter<Object, Object> writer =
       new UnsafeShuffleWriter<Object, Object>(
         blockManager,
         shuffleBlockResolver,
         taskMemoryManager,
         shuffleMemoryManager,
-        new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
+        new UnsafeShuffleHandle<>(0, 1, shuffleDep),
         0, // map id
         taskContext,
         conf);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 3c50033..0b11562 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -48,7 +48,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Before
   public void setup() {
-    shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+    shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, 
PAGE_SIZE_BYTES);
     taskMemoryManager = new TaskMemoryManager(new 
ExecutorMemoryManager(getMemoryAllocator()));
     // Mocked memory manager for tests that check the maximum array size, 
since actually allocating
     // such large arrays will cause us to run out of memory in our tests.
@@ -441,7 +441,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Test
   public void failureToAllocateFirstPage() {
-    shuffleMemoryManager = new ShuffleMemoryManager(1024);
+    shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024);
     BytesToBytesMap map =
       new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, 
PAGE_SIZE_BYTES);
     try {
@@ -461,7 +461,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Test
   public void failureToGrow() {
-    shuffleMemoryManager = new ShuffleMemoryManager(1024 * 10);
+    shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024 * 10);
     BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 
shuffleMemoryManager, 1, 1024);
     try {
       boolean success = true;

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index f530037..83049b8 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -102,7 +102,7 @@ public class UnsafeExternalSorterSuite {
     MockitoAnnotations.initMocks(this);
     sparkConf = new SparkConf();
     tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"unsafe-test");
-    shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+    shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, 
pageSizeBytes);
     spillFilesCreated.clear();
     taskContext = mock(TaskContext.class);
     when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
@@ -237,7 +237,7 @@ public class UnsafeExternalSorterSuite {
 
   @Test
   public void spillingOccursInResponseToMemoryPressure() throws Exception {
-    shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+    shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, 
pageSizeBytes);
     final UnsafeExternalSorter sorter = newSorter();
     final int numRecords = (int) pageSizeBytes / 4;
     for (int i = 0; i <= numRecords; i++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index f495b6a..6d45b1a 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,7 @@ import org.mockito.Mockito._
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
 
 class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
 
@@ -50,7 +50,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
   }
 
   test("single task requesting memory") {
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     assert(manager.tryToAcquire(100L) === 100L)
     assert(manager.tryToAcquire(400L) === 400L)
@@ -72,7 +72,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
     // Two threads request 500 bytes first, wait for each other to get it, and 
then request
     // 500 more; we should immediately return 0 as both are now at 1 / N
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Result1 = -1L
@@ -124,7 +124,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
     // Two tasks request 250 bytes first, wait for each other to get it, and 
then request
     // 500 more; we should only grant 250 bytes to each of them on this second 
request
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Result1 = -1L
@@ -176,7 +176,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
     // for a bit and releases 250 bytes, which should then be granted to t2. 
Further requests
     // by t2 will return false right away because it now has 1 / 2N of the 
memory.
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Requested = false
@@ -241,7 +241,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. 
It sleeps
     // for a bit and releases all its memory. t2 should now be able to grab 
all the memory.
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Requested = false
@@ -307,7 +307,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with 
Timeouts {
   }
 
   test("tasks should not be granted a negative size") {
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
     manager.tryToAcquire(700L)
 
     val latch = new CountDownLatch(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 60be85e..cd4c55f 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -54,7 +54,6 @@ def launch_gateway():
         if os.environ.get("SPARK_TESTING"):
             submit_args = ' '.join([
                 "--conf spark.ui.enabled=false",
-                "--conf spark.buffer.pageSize=4mb",
                 submit_args
             ])
         command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index b9d44aa..4d5e98a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -342,7 +342,7 @@ class TungstenAggregationIterator(
     TaskContext.get.taskMemoryManager(),
     SparkEnv.get.shuffleMemoryManager,
     1024 * 16, // initial capacity
-    SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
+    SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
     false // disable tracking of performance metrics
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 3f257ec..953abf4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
     // This is used in Broadcast, shared by multiple tasks, so we use on-heap 
memory
     val taskMemoryManager = new TaskMemoryManager(new 
ExecutorMemoryManager(MemoryAllocator.HEAP))
 
+    val pageSizeBytes = 
Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+      .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", 
"16m"))
+
     // Dummy shuffle memory manager which always grants all memory allocation 
requests.
     // We use this because it doesn't make sense count shared broadcast 
variables' memory usage
     // towards individual tasks' quotas. In the future, we should devise a 
better way of handling
     // this.
-    val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
-      override def tryToAcquire(numBytes: Long): Long = numBytes
-      override def release(numBytes: Long): Unit = {}
-    }
-
-    val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new 
SparkConf())
-      .getSizeAsBytes("spark.buffer.pageSize", "64m")
+    val shuffleMemoryManager =
+      ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = 
pageSizeBytes)
 
     binaryMap = new BytesToBytesMap(
       taskMemoryManager,
@@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
     while (i < nKeys) {
       val keySize = in.readInt()
       val valuesSize = in.readInt()
-      if (keySize > keyBuffer.size) {
+      if (keySize > keyBuffer.length) {
         keyBuffer = new Array[Byte](keySize)
       }
       in.readFully(keyBuffer, 0, keySize)
-      if (valuesSize > valuesBuffer.size) {
+      if (valuesSize > valuesBuffer.length) {
         valuesBuffer = new Array[Byte](valuesSize)
       }
       in.readFully(valuesBuffer, 0, valuesSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 7f69cdb..e316930 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
 import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
@@ -122,7 +122,7 @@ case class TungstenSort(
   protected override def doExecute(): RDD[InternalRow] = {
     val schema = child.schema
     val childOutput = child.output
-    val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", 
"64m")
+    val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
 
     /**
      * Set up the sorter in each partition before computing the parent 
partition.

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
deleted file mode 100644
index 3854f5b..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types._
-
-
-private[parquet] object ParquetTypesConverter extends Logging {
-  def isPrimitiveType(ctype: DataType): Boolean = ctype match {
-    case _: NumericType | BooleanType | DateType | TimestampType | StringType 
| BinaryType => true
-    case _ => false
-  }
-
-  /**
-   * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given 
DECIMAL precision.
-   */
-  private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { 
precision =>
-    var length = 1
-    while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
-      length += 1
-    }
-    length
-  }
-
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
-    val converter = new CatalystSchemaConverter()
-    converter.convert(StructType.fromAttributes(attributes))
-  }
-
-  def convertFromString(string: String): Seq[Attribute] = {
-    
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) 
match {
-      case s: StructType => s.toAttributes
-      case other => sys.error(s"Can convert $string to row")
-    }
-  }
-
-  def convertToString(schema: Seq[Attribute]): String = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
-    StructType.fromAttributes(schema).json
-  }
-
-  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: 
Configuration): Unit = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to write Parquet metadata: 
path is null")
-    }
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"Unable to write Parquet metadata: path $origPath is incorrectly 
formatted")
-    }
-    val path = origPath.makeQualified(fs)
-    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(s"Expected to write to directory 
$path but found file")
-    }
-    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
-    if (fs.exists(metadataPath)) {
-      try {
-        fs.delete(metadataPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(s"Unable to delete previous 
PARQUET_METADATA_FILE at $metadataPath")
-      }
-    }
-    val extraMetadata = new java.util.HashMap[String, String]()
-    extraMetadata.put(
-      CatalystReadSupport.SPARK_METADATA_KEY,
-      ParquetTypesConverter.convertToString(attributes))
-    // TODO: add extra data, e.g., table name, date, etc.?
-
-    val parquetSchema: MessageType = 
ParquetTypesConverter.convertFromAttributes(attributes)
-    val metaData: FileMetaData = new FileMetaData(
-      parquetSchema,
-      extraMetadata,
-      "Spark")
-
-    ParquetRelation.enableLogForwarding()
-    ParquetFileWriter.writeMetadataFile(
-      conf,
-      path,
-      new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
-  }
-
-  /**
-   * Try to read Parquet metadata at the given Path. We first see if there is 
a summary file
-   * in the parent directory. If so, this is used. Else we read the actual 
footer at the given
-   * location.
-   * @param origPath The path at which we expect one (or more) Parquet files.
-   * @param configuration The Hadoop configuration to use.
-   * @return The `ParquetMetadata` containing among other things the schema.
-   */
-  def readMetaData(origPath: Path, configuration: Option[Configuration]): 
ParquetMetadata = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to read Parquet metadata: 
path is null")
-    }
-    val job = new Job()
-    val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
-    val fs: FileSystem = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(s"Incorrectly formatted Parquet 
metadata path $origPath")
-    }
-    val path = origPath.makeQualified(fs)
-
-    val children =
-      fs
-        .globStatus(path)
-        .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) 
else List(status) }
-        .filterNot { status =>
-          val name = status.getPath.getName
-          (name(0) == '.' || name(0) == '_') && name != 
ParquetFileWriter.PARQUET_METADATA_FILE
-        }
-
-    ParquetRelation.enableLogForwarding()
-
-    // NOTE (lian): Parquet "_metadata" file can be very slow if the file 
consists of lots of row
-    // groups. Since Parquet schema is replicated among all row groups, we 
only need to touch a
-    // single row group to read schema related metadata. Notice that we are 
making assumptions that
-    // all data in a single Parquet file have the same schema, which is 
normally true.
-    children
-      // Try any non-"_metadata" file first...
-      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
-      // ... and fallback to "_metadata" if no such file exists (which implies 
the Parquet file is
-      // empty, thus normally the "_metadata" file is expected to be fairly 
small).
-      .orElse(children.find(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE))
-      .map(ParquetFileReader.readFooter(conf, _, 
ParquetMetadataConverter.NO_FILTER))
-      .getOrElse(
-        throw new IllegalArgumentException(s"Could not find Parquet metadata 
at path $path"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
new file mode 100644
index 0000000..3854f5b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.IOException
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.types._
+
+
+private[parquet] object ParquetTypesConverter extends Logging {
+  def isPrimitiveType(ctype: DataType): Boolean = ctype match {
+    case _: NumericType | BooleanType | DateType | TimestampType | StringType 
| BinaryType => true
+    case _ => false
+  }
+
+  /**
+   * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given 
DECIMAL precision.
+   */
+  private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { 
precision =>
+    var length = 1
+    while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
+      length += 1
+    }
+    length
+  }
+
+  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+    val converter = new CatalystSchemaConverter()
+    converter.convert(StructType.fromAttributes(attributes))
+  }
+
+  def convertFromString(string: String): Seq[Attribute] = {
+    
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) 
match {
+      case s: StructType => s.toAttributes
+      case other => sys.error(s"Can convert $string to row")
+    }
+  }
+
+  def convertToString(schema: Seq[Attribute]): String = {
+    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+    StructType.fromAttributes(schema).json
+  }
+
+  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: 
Configuration): Unit = {
+    if (origPath == null) {
+      throw new IllegalArgumentException("Unable to write Parquet metadata: 
path is null")
+    }
+    val fs = origPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException(
+        s"Unable to write Parquet metadata: path $origPath is incorrectly 
formatted")
+    }
+    val path = origPath.makeQualified(fs)
+    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
+      throw new IllegalArgumentException(s"Expected to write to directory 
$path but found file")
+    }
+    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
+    if (fs.exists(metadataPath)) {
+      try {
+        fs.delete(metadataPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(s"Unable to delete previous 
PARQUET_METADATA_FILE at $metadataPath")
+      }
+    }
+    val extraMetadata = new java.util.HashMap[String, String]()
+    extraMetadata.put(
+      CatalystReadSupport.SPARK_METADATA_KEY,
+      ParquetTypesConverter.convertToString(attributes))
+    // TODO: add extra data, e.g., table name, date, etc.?
+
+    val parquetSchema: MessageType = 
ParquetTypesConverter.convertFromAttributes(attributes)
+    val metaData: FileMetaData = new FileMetaData(
+      parquetSchema,
+      extraMetadata,
+      "Spark")
+
+    ParquetRelation.enableLogForwarding()
+    ParquetFileWriter.writeMetadataFile(
+      conf,
+      path,
+      new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil)
+  }
+
+  /**
+   * Try to read Parquet metadata at the given Path. We first see if there is 
a summary file
+   * in the parent directory. If so, this is used. Else we read the actual 
footer at the given
+   * location.
+   * @param origPath The path at which we expect one (or more) Parquet files.
+   * @param configuration The Hadoop configuration to use.
+   * @return The `ParquetMetadata` containing among other things the schema.
+   */
+  def readMetaData(origPath: Path, configuration: Option[Configuration]): 
ParquetMetadata = {
+    if (origPath == null) {
+      throw new IllegalArgumentException("Unable to read Parquet metadata: 
path is null")
+    }
+    val job = new Job()
+    val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
+    val fs: FileSystem = origPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException(s"Incorrectly formatted Parquet 
metadata path $origPath")
+    }
+    val path = origPath.makeQualified(fs)
+
+    val children =
+      fs
+        .globStatus(path)
+        .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) 
else List(status) }
+        .filterNot { status =>
+          val name = status.getPath.getName
+          (name(0) == '.' || name(0) == '_') && name != 
ParquetFileWriter.PARQUET_METADATA_FILE
+        }
+
+    ParquetRelation.enableLogForwarding()
+
+    // NOTE (lian): Parquet "_metadata" file can be very slow if the file 
consists of lots of row
+    // groups. Since Parquet schema is replicated among all row groups, we 
only need to touch a
+    // single row group to read schema related metadata. Notice that we are 
making assumptions that
+    // all data in a single Parquet file have the same schema, which is 
normally true.
+    children
+      // Try any non-"_metadata" file first...
+      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
+      // ... and fallback to "_metadata" if no such file exists (which implies 
the Parquet file is
+      // empty, thus normally the "_metadata" file is expected to be fairly 
small).
+      .orElse(children.find(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE))
+      .map(ParquetFileReader.readFooter(conf, _, 
ParquetMetadataConverter.NO_FILTER))
+      .getOrElse(
+        throw new IllegalArgumentException(s"Could not find Parquet metadata 
at path $path"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 53de2d0..48c3938 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
 /**
  * A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
  */
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
+class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 
1024 * 1024) {
   private var oom = false
 
   override def tryToAcquire(numBytes: Long): Long = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 167086d..296cc5c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -52,7 +52,6 @@ object TestHive
         .set("spark.sql.test", "")
         .set("spark.sql.hive.metastore.barrierPrefixes",
           "org.apache.spark.sql.hive.execution.PairSerDe")
-        .set("spark.buffer.pageSize", "4m")
         // SPARK-8910
         .set("spark.ui.enabled", "false")))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e439c29/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git 
a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index cf693d0..70b81ce 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -25,6 +25,12 @@ public class ByteArrayMethods {
     // Private constructor, since this class only contains static methods.
   }
 
+  /** Returns the next number greater or equal num that is power of 2. */
+  public static long nextPowerOf2(long num) {
+    final long highBit = Long.highestOneBit(num);
+    return (highBit == num) ? num : highBit << 1;
+  }
+
   public static int roundNumberOfBytesToNearestWord(int numBytes) {
     int remainder = numBytes & 0x07;  // This is equivalent to `numBytes % 8`
     if (remainder == 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to