Repository: spark Updated Branches: refs/heads/master 66d9d0edf -> 2ef4c5963
[SPARK-13830] prefer block manager than direct result for large result ## What changes were proposed in this pull request? The current RPC can't handle large blocks very well, it's very slow to fetch 100M block (about 1 minute). Once switch to block manager to fetch that, it took about 10 seconds (still could be improved). ## How was this patch tested? existing unit tests. Author: Davies Liu <[email protected]> Closes #11659 from davies/direct_result. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ef4c596 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ef4c596 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ef4c596 Branch: refs/heads/master Commit: 2ef4c5963bff3574fe17e669d703b25ddd064e5d Parents: 66d9d0e Author: Davies Liu <[email protected]> Authored: Fri Mar 11 15:39:21 2016 -0800 Committer: Davies Liu <[email protected]> Committed: Fri Mar 11 15:39:21 2016 -0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/executor/Executor.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2ef4c596/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e88d6cd..07e3c12 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,9 +97,11 @@ private[spark] class Executor( // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) - // Max RPC message size. If task result is bigger than this, we use the block manager + // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. - private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + private val maxDirectResultSize = Math.min( + conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20), + RpcUtils.maxMessageSizeBytes(conf)) // Limit of bytes for total size of results (default is 1GB) private val maxResultSize = Utils.getMaxResultSize(conf) @@ -279,6 +281,7 @@ private[spark] class Executor( // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() + // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit @@ -290,7 +293,7 @@ private[spark] class Executor( s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) - } else if (resultSize >= maxRpcMessageSize) { + } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
