Repository: spark Updated Branches: refs/heads/branch-1.5 9df2a2d76 -> 703e3f1ea
[SPARK-9945] [SQL] pageSize should be calculated from executor.memory Currently, pageSize of TungstenSort is calculated from driver.memory, it should use executor.memory instead. Also, in the worst case, the safeFactor could be 4 (because of rounding), increase it to 16. cc rxin Author: Davies Liu <[email protected]> Closes #8175 from davies/page_size. (cherry picked from commit bd35385d53a6b039e0241e3e73092b8b0a8e455a) Signed-off-by: Davies Liu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/703e3f1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/703e3f1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/703e3f1e Branch: refs/heads/branch-1.5 Commit: 703e3f1eaee86c8b835468b36613b202041e0810 Parents: 9df2a2d Author: Davies Liu <[email protected]> Authored: Thu Aug 13 21:12:59 2015 -0700 Committer: Davies Liu <[email protected]> Committed: Thu Aug 13 21:13:08 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala | 4 +++- .../src/main/scala/org/apache/spark/sql/execution/sort.scala | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/703e3f1e/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 8c3a726..a0d8abc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -175,7 +175,9 @@ private[spark] object ShuffleMemoryManager { val minPageSize = 1L * 1024 * 1024 // 1MB val maxPageSize = 64L * minPageSize // 64MB val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors() - val safetyFactor = 8 + // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case + val safetyFactor = 16 + // TODO(davies): don't round to next power of 2 val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor) val default = math.min(maxPageSize, math.max(minPageSize, size)) conf.getSizeAsBytes("spark.buffer.pageSize", default) http://git-wip-us.apache.org/repos/asf/spark/blob/703e3f1e/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index e316930..40ef7c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -17,15 +17,15 @@ package org.apache.spark.sql.execution -import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext} import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution, Distribution} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.sql.types.StructType import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext} //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines various sort operators. @@ -122,7 +122,6 @@ case class TungstenSort( protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema val childOutput = child.output - val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes /** * Set up the sorter in each partition before computing the parent partition. @@ -143,6 +142,7 @@ case class TungstenSort( } } + val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes val sorter = new UnsafeExternalRowSorter( schema, ordering, prefixComparator, prefixComputer, pageSize) if (testSpillFrequency > 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
