This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d84f1a3575c4 [SPARK-49217][CORE] Support separate buffer size
configuration in UnsafeShuffleWriter
d84f1a3575c4 is described below
commit d84f1a3575c4125009374521d2f179089ebd71ad
Author: sychen <[email protected]>
AuthorDate: Fri Aug 23 02:35:30 2024 -0500
[SPARK-49217][CORE] Support separate buffer size configuration in
UnsafeShuffleWriter
### What changes were proposed in this pull request?
This PR aims to support separate buffer size configuration in
UnsafeShuffleWriter.
Introduce `spark.shuffle.file.merge.buffer` configuration.
### Why are the changes needed?
`UnsafeShuffleWriter#mergeSpillsWithFileStream` uses
`spark.shuffle.file.buffer` as the buffer for reading spill files, and this
buffer is an off-heap buffer.
In the spill process, we hope that the buffer size is larger, but once
there are too many files in the spill,
`UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of
off-heap memory, which makes the executor easily killed by YARN.
https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Production environment verification
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47733 from cxzl25/SPARK-49217.
Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 8 ++++----
.../main/scala/org/apache/spark/internal/config/package.scala | 8 ++++++++
docs/configuration.md | 10 ++++++++++
3 files changed, 22 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 13fd18c0942b..ac9d335d6359 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -87,7 +87,7 @@ public class UnsafeShuffleWriter<K, V> extends
ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;
private final int initialSortBufferSize;
- private final int inputBufferSizeInBytes;
+ private final int mergeBufferSizeInBytes;
@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
@@ -140,8 +140,8 @@ public class UnsafeShuffleWriter<K, V> extends
ShuffleWriter<K, V> {
this.transferToEnabled = (boolean)
sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
this.initialSortBufferSize =
(int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
- this.inputBufferSizeInBytes =
- (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE())
* 1024;
+ this.mergeBufferSizeInBytes =
+ (int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_FILE_MERGE_BUFFER_SIZE()) * 1024;
open();
}
@@ -372,7 +372,7 @@ public class UnsafeShuffleWriter<K, V> extends
ShuffleWriter<K, V> {
for (int i = 0; i < spills.length; i++) {
spillInputStreams[i] = new NioBufferedFileInputStream(
spills[i].file,
- inputBufferSizeInBytes);
+ mergeBufferSizeInBytes);
// Only convert the partitionLengths when debug level is enabled.
if (logger.isDebugEnabled()) {
logger.debug("Partition lengths for mapId {} in Spill {}: {}",
mapId, i,
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 940d72df5df6..e9e411cc56b5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1466,6 +1466,14 @@ package object config {
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")
+ private[spark] val SHUFFLE_FILE_MERGE_BUFFER_SIZE =
+ ConfigBuilder("spark.shuffle.file.merge.buffer")
+ .doc("Size of the in-memory buffer for each shuffle file input stream,
in KiB unless " +
+ "otherwise specified. These buffers use off-heap buffers and are
related to the number " +
+ "of files in the shuffle file. Too large buffers should be avoided.")
+ .version("4.0.0")
+ .fallbackConf(SHUFFLE_FILE_BUFFER_SIZE)
+
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("(Deprecated since Spark 4.0, please use
'spark.shuffle.localDisk.file.output.buffer'.)")
diff --git a/docs/configuration.md b/docs/configuration.md
index 2881660eded6..532da87f5626 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1029,6 +1029,16 @@ Apart from these, the following properties are also
available, and may be useful
</td>
<td>1.4.0</td>
</tr>
+<tr>
+ <td><code>spark.shuffle.file.merge.buffer</code></td>
+ <td>32k</td>
+ <td>
+ Size of the in-memory buffer for each shuffle file input stream, in KiB
unless otherwise
+ specified. These buffers use off-heap buffers and are related to the
number of files in
+ the shuffle file. Too large buffers should be avoided.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
<td>32k</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]