Repository: spark Updated Branches: refs/heads/master 6948ab6f8 -> d8ccf655f
[SPARK-3570] Include time to open files in shuffle write time. Opening shuffle files can be very significant when the disk is contended, especially when using ext3. While writing data to a file can avoid hitting disk (and instead hit the buffer cache), opening a file always involves writing some metadata about the file to disk, so the open time can be a very significant portion of the shuffle write time. In one job I ran recently, the time to write shuffle data to the file was only 4ms for each task, but the time to open the file was about 100x as long (~400ms). When we add metrics about spilled data (#2504), we should ensure that the file open time is also included there. Author: Kay Ousterhout <[email protected]> Closes #4550 from kayousterhout/SPARK-3570 and squashes the following commits: ea3a4ae [Kay Ousterhout] Added comment about excluded open time fdc5185 [Kay Ousterhout] Improved comment 42b7e43 [Kay Ousterhout] Fixed parens for nanotime 2423555 [Kay Ousterhout] [SPARK-3570] Include time to open files in shuffle write time. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8ccf655 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8ccf655 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8ccf655 Branch: refs/heads/master Commit: d8ccf655f344eed65cdaf5d9252f1b565b8406ca Parents: 6948ab6 Author: Kay Ousterhout <[email protected]> Authored: Tue Mar 24 16:29:40 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Tue Mar 24 16:29:40 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/shuffle/FileShuffleBlockManager.scala | 4 ++++ .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 3 +++ .../scala/org/apache/spark/util/collection/ExternalSorter.scala | 5 +++++ 3 files changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8ccf655/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index 660df00..d0178df 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null + val openStartTime = System.nanoTime val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => @@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics) } } + // Creating the file to write to and creating a disk writer both involve interacting with + // the disk, so should be included in the shuffle write time. + writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) override def releaseWriters(success: Boolean) { if (consolidateShuffleFiles) { http://git-wip-us.apache.org/repos/asf/spark/blob/d8ccf655/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index fa2e617..55ea0f1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } + // Don't bother including the time to open the merged output file in the shuffle write time, + // because it just opens a single file, so is typically too fast to measure accurately + // (see SPARK-3570). val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) http://git-wip-us.apache.org/repos/asf/spark/blob/d8ccf655/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 3262e67..b962c10 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -352,6 +352,7 @@ private[spark] class ExternalSorter[K, V, C]( // Create our file writers if we haven't done so yet if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() + val openStartTime = System.nanoTime partitionWriters = Array.fill(numPartitions) { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use @@ -359,6 +360,10 @@ private[spark] class ExternalSorter[K, V, C]( val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } + // Creating the file to write to and creating a disk writer both involve interacting with + // the disk, and can take a long time in aggregate when we open many files, so should be + // included in the shuffle write time. + curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) } // No need to sort stuff, just write each element out --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
