Repository: spark
Updated Branches:
refs/heads/master 463a67668 -> 6c9e5ac9d
[SPARK-25776][CORE]The disk write buffer size must be greater than 12
## What changes were proposed in this pull request?
In `UnsafeSorterSpillWriter.java`, when we write a record to a spill file wtih
` void write(Object baseObject, long baseOffset, int recordLength, long
keyPrefix)`, `recordLength` and `keyPrefix` will be written the disk write
buffer first, and these will take 12 bytes, so the disk write buffer size must
be greater than 12.
If `diskWriteBufferSize` is 10, it will print this exception info:
_java.lang.ArrayIndexOutOfBoundsException: 10
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.writeLongToBuffer
(UnsafeSorterSpillWriter.java:91)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:123)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:65)_
## How was this patch tested?
Existing UT in `UnsafeExternalSorterSuite`
Closes #22754 from 10110346/diskWriteBufferSize.
Authored-by: liuxian <[email protected]>
Signed-off-by: Kazuaki Ishizaki <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c9e5ac9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c9e5ac9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c9e5ac9
Branch: refs/heads/master
Commit: 6c9e5ac9de3d0ae5ea86b768608b42b5feb46df4
Parents: 463a676
Author: liuxian <[email protected]>
Authored: Mon Nov 5 01:55:13 2018 +0900
Committer: Kazuaki Ishizaki <[email protected]>
Committed: Mon Nov 5 01:55:13 2018 +0900
----------------------------------------------------------------------
.../util/collection/unsafe/sort/UnsafeSorterSpillWriter.java | 5 ++++-
.../main/scala/org/apache/spark/internal/config/package.scala | 6 ++++--
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 9399024..c1d71a2 100644
---
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -42,7 +42,10 @@ public final class UnsafeSorterSpillWriter {
private final SparkConf conf = new SparkConf();
- /** The buffer size to use when writing the sorted records to an on-disk
file */
+ /**
+ * The buffer size to use when writing the sorted records to an on-disk
file, and
+ * this space used by prefix + len + recordLength must be greater than 4 + 8
bytes.
+ */
private final int diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 034e5eb..c8993e1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
package object config {
@@ -504,8 +505,9 @@ package object config {
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size, in bytes, to use when writing the sorted records
to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
- .checkValue(v => v > 0 && v <= Int.MaxValue,
- s"The buffer size must be greater than 0 and less than
${Int.MaxValue}.")
+ .checkValue(v => v > 12 && v <=
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+ s"The buffer size must be greater than 12 and less than or equal to " +
+ s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
.createWithDefault(1024 * 1024)
private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]