Repository: spark
Updated Branches:
  refs/heads/branch-1.5 9df2a2d76 -> 703e3f1ea


[SPARK-9945] [SQL] pageSize should be calculated from executor.memory

Currently, pageSize of TungstenSort is calculated from driver.memory, it should 
use executor.memory instead.

Also, in the worst case, the safeFactor could be 4 (because of rounding), 
increase it to 16.

cc rxin

Author: Davies Liu <[email protected]>

Closes #8175 from davies/page_size.

(cherry picked from commit bd35385d53a6b039e0241e3e73092b8b0a8e455a)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/703e3f1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/703e3f1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/703e3f1e

Branch: refs/heads/branch-1.5
Commit: 703e3f1eaee86c8b835468b36613b202041e0810
Parents: 9df2a2d
Author: Davies Liu <[email protected]>
Authored: Thu Aug 13 21:12:59 2015 -0700
Committer: Davies Liu <[email protected]>
Committed: Thu Aug 13 21:13:08 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala  | 4 +++-
 .../src/main/scala/org/apache/spark/sql/execution/sort.scala   | 6 +++---
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/703e3f1e/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index 8c3a726..a0d8abc 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -175,7 +175,9 @@ private[spark] object ShuffleMemoryManager {
     val minPageSize = 1L * 1024 * 1024   // 1MB
     val maxPageSize = 64L * minPageSize  // 64MB
     val cores = if (numCores > 0) numCores else 
Runtime.getRuntime.availableProcessors()
-    val safetyFactor = 8
+    // Because of rounding to next power of 2, we may have safetyFactor as 8 
in worst case
+    val safetyFactor = 16
+    // TODO(davies): don't round to next power of 2
     val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
     val default = math.min(maxPageSize, math.max(minPageSize, size))
     conf.getSizeAsBytes("spark.buffer.pageSize", default)

http://git-wip-us.apache.org/repos/asf/spark/blob/703e3f1e/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index e316930..40ef7c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
 import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, 
OrderedDistribution, Distribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.CompletionIterator
 import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines various sort operators.
@@ -122,7 +122,6 @@ case class TungstenSort(
   protected override def doExecute(): RDD[InternalRow] = {
     val schema = child.schema
     val childOutput = child.output
-    val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
 
     /**
      * Set up the sorter in each partition before computing the parent 
partition.
@@ -143,6 +142,7 @@ case class TungstenSort(
         }
       }
 
+      val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
       val sorter = new UnsafeExternalRowSorter(
         schema, ordering, prefixComparator, prefixComputer, pageSize)
       if (testSpillFrequency > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to