This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 76d2878b61bd [SPARK-53188][CORE][SQL] Support `readFully` in `SparkStreamUtils` and `JavaUtils` 76d2878b61bd is described below commit 76d2878b61bd0be878f30aff230b01cd0d5c96cb Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Aug 7 21:19:04 2025 -0700 [SPARK-53188][CORE][SQL] Support `readFully` in `SparkStreamUtils` and `JavaUtils` ### What changes were proposed in this pull request? This PR aims to support `readFully` in `SparkStreamUtils` and `JavaUtils` which is based on Java 9+ `readNBytes` API. ```java public static void readFully(InputStream in, byte[] arr, int off, int len) throws IOException { if (in == null || len < 0 || (off < 0 || off > arr.length - len)) { throw new IllegalArgumentException("Invalid input argument"); } if (len != in.readNBytes(arr, off, len)) { throw new EOFException("Fail to read " + len + " bytes."); } } ``` ### Why are the changes needed? ```scala - ByteStreams.readFully(is, rowBuffer, 0, rowSize) + Utils.readFully(is, rowBuffer, 0, rowSize) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51915 from dongjoon-hyun/SPARK-53188. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/network/shuffle/ExternalBlockHandlerSuite.java | 3 +-- .../main/java/org/apache/spark/network/util/JavaUtils.java | 12 ++++++++++++ .../main/scala/org/apache/spark/util/SparkStreamUtils.scala | 5 +++++ .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 4 ++-- .../scala/org/apache/spark/security/CryptoStreamUtils.scala | 3 +-- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 5 ++--- .../scala/org/apache/spark/io/CompressionCodecSuite.scala | 5 ++--- .../spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 3 +-- dev/checkstyle.xml | 4 ++++ scalastyle-config.xml | 5 +++++ .../org/apache/spark/sql/execution/UnsafeRowSerializer.scala | 7 +++---- .../streaming/state/HDFSBackedStateStoreProvider.scala | 9 ++++----- .../sql/execution/streaming/state/StateStoreChangelog.scala | 7 +++---- .../streaming/StreamingQueryHashPartitionVerifySuite.scala | 5 ++--- 15 files changed, 48 insertions(+), 31 deletions(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index f7edc8837fde..2a3135e3c8ae 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -27,7 +27,6 @@ import java.util.zip.Checksum; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.Timer; -import com.google.common.io.ByteStreams; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -136,7 +135,7 @@ public class ExternalBlockHandlerSuite { CheckedInputStream checkedIn = new CheckedInputStream( blockMarkers[0].createInputStream(), checksum); byte[] buffer = new byte[10]; - ByteStreams.readFully(checkedIn, buffer, 0, (int) blockMarkers[0].size()); + JavaUtils.readFully(checkedIn, buffer, 0, (int) blockMarkers[0].size()); long checksumByWriter = checkedIn.getChecksum().getValue(); // when checksumByWriter == checksumRecalculated and checksumByReader != checksumByWriter diff --git a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java index 9e2db5250566..3a2485520c66 100644 --- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -610,6 +610,18 @@ public class JavaUtils { } } + /** + * Read len bytes exactly, otherwise throw exceptions. + */ + public static void readFully(InputStream in, byte[] arr, int off, int len) throws IOException { + if (in == null || len < 0 || (off < 0 || off > arr.length - len)) { + throw new IllegalArgumentException("Invalid input argument"); + } + if (len != in.readNBytes(arr, off, len)) { + throw new EOFException("Fail to read " + len + " bytes."); + } + } + /** * Copy the content of a URL into a file. */ diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala index e1a1c98987e8..d4e44b019120 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala @@ -20,6 +20,7 @@ import java.io.{FileInputStream, FileOutputStream, InputStream, OutputStream} import java.nio.channels.{FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets +import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally private[spark] trait SparkStreamUtils { @@ -109,6 +110,10 @@ private[spark] trait SparkStreamUtils { def toString(in: InputStream): String = { new String(in.readAllBytes(), StandardCharsets.UTF_8) } + + def readFully(in: InputStream, arr: Array[Byte], off: Int, len: Int): Unit = { + JavaUtils.readFully(in, arr, off, len) + } } private [spark] object SparkStreamUtils extends SparkStreamUtils diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 0693f8cb1a80..6674fcd73b92 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -17,7 +17,6 @@ package org.apache.spark.util.collection.unsafe.sort; -import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; @@ -27,6 +26,7 @@ import org.apache.spark.internal.SparkLogger; import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.spark.io.ReadAheadInputStream; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; import org.apache.spark.unsafe.Platform; @@ -128,7 +128,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen arr = new byte[recordLength]; baseObject = arr; } - ByteStreams.readFully(in, arr, 0, recordLength); + JavaUtils.readFully(in, arr, 0, recordLength); numRecordsRemaining--; if (numRecordsRemaining == 0) { close(); diff --git a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala index 8bdb80d65207..b230df93e1fa 100644 --- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala +++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala @@ -26,7 +26,6 @@ import javax.crypto.spec.{IvParameterSpec, SecretKeySpec} import scala.jdk.CollectionConverters._ -import com.google.common.io.ByteStreams import org.apache.commons.crypto.random._ import org.apache.commons.crypto.stream._ @@ -84,7 +83,7 @@ private[spark] object CryptoStreamUtils extends Logging { sparkConf: SparkConf, key: Array[Byte]): InputStream = { val iv = new Array[Byte](IV_LENGTH_IN_BYTES) - ByteStreams.readFully(is, iv) + JavaUtils.readFully(is, iv, 0, IV_LENGTH_IN_BYTES) val params = new CryptoParams(key, sparkConf) new CryptoInputStream(params.transformation, params.conf, is, params.keySpec, new IvParameterSpec(iv)) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fc9375025881..57f70489a860 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1589,7 +1589,7 @@ private[spark] object Utils try { stream.skipNBytes(effectiveStart) - ByteStreams.readFully(stream, buff) + readFully(stream, buff, 0, buff.length) } finally { stream.close() } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 68b181de2928..992bb37f44a0 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,11 +20,10 @@ package org.apache.spark.io import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer -import com.google.common.io.ByteStreams - import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { @@ -145,7 +144,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { val inputStream = chunkedByteBuffer.toInputStream(dispose = false) val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt) - ByteStreams.readFully(inputStream, bytesFromStream) + Utils.readFully(inputStream, bytesFromStream, 0, bytesFromStream.length) assert(bytesFromStream === bytes1.array() ++ bytes2.array()) assert(chunkedByteBuffer.getChunks().head.position() === 0) } diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index ff971b72d891..d6f0bfd237e4 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -20,10 +20,9 @@ package org.apache.spark.io import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.util.Locale -import com.google.common.io.ByteStreams - import org.apache.spark.{SparkConf, SparkFunSuite, SparkIllegalArgumentException} import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED +import org.apache.spark.util.Utils class CompressionCodecSuite extends SparkFunSuite { val conf = new SparkConf(false) @@ -158,7 +157,7 @@ class CompressionCodecSuite extends SparkFunSuite { } val concatenatedBytes = codec.compressedInputStream(new ByteArrayInputStream(bytes1 ++ bytes2)) val decompressed: Array[Byte] = new Array[Byte](128) - ByteStreams.readFully(concatenatedBytes, decompressed) + Utils.readFully(concatenatedBytes, decompressed, 0, decompressed.length) assert(decompressed.toSeq === (0 to 127)) } diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 3de78407ca16..211de2e8729e 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -29,7 +29,6 @@ import scala.concurrent.ExecutionContext.Implicits.global // scalastyle:on executioncontextglobal import scala.concurrent.Future -import com.google.common.io.ByteStreams import io.netty.util.internal.OutOfDirectMemoryError import org.apache.logging.log4j.Level import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -289,7 +288,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite { intercept[FetchFailedException] { val inputStream = iterator.next()._2 // Consume the data to trigger the corruption - ByteStreams.readFully(inputStream, new Array[Byte](100)) + Utils.readFully(inputStream, new Array[Byte](100), 0, 100) } // The block will be fetched only once because corruption can't be detected in // maxBytesInFlight/3 of the data size diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index 78f02b957b47..f7c4801c9e6f 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -213,6 +213,10 @@ <property name="format" value="ByteStreams\.skipFully"/> <property name="message" value="Use Java skipNBytes instead." /> </module> + <module name="RegexpSinglelineJava"> + <property name="format" value="ByteStreams\.readFully"/> + <property name="message" value="Use readFully of JavaUtils/SparkStreamUtils/Utils instead." /> + </module> <module name="RegexpSinglelineJava"> <property name="format" value="FileUtils.writeStringToFile"/> <property name="message" value="Use java.nio.file.Files.writeString instead." /> diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 36c31c8e3187..3b3dce52b36c 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -737,6 +737,11 @@ This file is divided into 3 sections: <customMessage>Use Java `skipNBytes` instead.</customMessage> </check> + <check customId="readFully" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">\bByteStreams\.readFully\b</parameter></parameters> + <customMessage>Use readFully of JavaUtils/SparkStreamUtils/Utils instead.</customMessage> + </check> + <check customId="maputils" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">org\.apache\.commons\.collections4\.MapUtils\b</parameter></parameters> <customMessage>Use org.apache.spark.util.collection.Utils instead.</customMessage> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 42fcfa8d60fa..9728d664998e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -22,13 +22,12 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag -import com.google.common.io.ByteStreams - import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.unsafe.Platform +import org.apache.spark.util.Utils /** * Serializer for serializing [[UnsafeRow]]s during shuffle. Since UnsafeRows are already stored as @@ -125,7 +124,7 @@ private class UnsafeRowSerializerInstance( if (rowBuffer.length < rowSize) { rowBuffer = new Array[Byte](rowSize) } - ByteStreams.readFully(dIn, rowBuffer, 0, rowSize) + Utils.readFully(dIn, rowBuffer, 0, rowSize) row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize) rowSize = readSize() if (rowSize == EOF) { // We are returning the last row in this stream @@ -160,7 +159,7 @@ private class UnsafeRowSerializerInstance( if (rowBuffer.length < rowSize) { rowBuffer = new Array[Byte](rowSize) } - ByteStreams.readFully(dIn, rowBuffer, 0, rowSize) + Utils.readFully(dIn, rowBuffer, 0, rowSize) row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize) row.asInstanceOf[T] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 0ba4b1955c82..8f8ec84e2d10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -26,7 +26,6 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ @@ -662,7 +661,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with fileToRead, toString(), keySize) } else { val keyRowBuffer = new Array[Byte](keySize) - ByteStreams.readFully(input, keyRowBuffer, 0, keySize) + Utils.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) keyRow.pointTo(keyRowBuffer, keySize) @@ -672,7 +671,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with map.remove(keyRow) } else { val valueRowBuffer = new Array[Byte](valueSize) - ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) + Utils.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. // This is a workaround for the following: @@ -782,7 +781,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with fileToRead, toString(), keySize) } else { val keyRowBuffer = new Array[Byte](keySize) - ByteStreams.readFully(input, keyRowBuffer, 0, keySize) + Utils.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) keyRow.pointTo(keyRowBuffer, keySize) @@ -793,7 +792,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with fileToRead, toString(), valueSize) } else { val valueRowBuffer = new Array[Byte](valueSize) - ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) + Utils.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. // This is a workaround for the following: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index ccb58ed05a6d..9858b2494f84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -21,7 +21,6 @@ import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOExce import scala.util.control.NonFatal -import com.google.common.io.ByteStreams import org.apache.hadoop.fs.{FSError, Path} import org.json4s._ import org.json4s.jackson.Serialization @@ -485,14 +484,14 @@ class StateStoreChangelogReaderV1( } else { // TODO: reuse the key buffer and value buffer across records. val keyBuffer = new Array[Byte](keySize) - ByteStreams.readFully(input, keyBuffer, 0, keySize) + Utils.readFully(input, keyBuffer, 0, keySize) val valueSize = input.readInt() if (valueSize < 0) { // A deletion record (RecordType.DELETE_RECORD, keyBuffer, null) } else { val valueBuffer = new Array[Byte](valueSize) - ByteStreams.readFully(input, valueBuffer, 0, valueSize) + Utils.readFully(input, valueBuffer, 0, valueSize) // A put record. (RecordType.PUT_RECORD, keyBuffer, valueBuffer) } @@ -516,7 +515,7 @@ class StateStoreChangelogReaderV2( private def parseBuffer(input: DataInputStream): Array[Byte] = { val blockSize = input.readInt() val blockBuffer = new Array[Byte](blockSize) - ByteStreams.readFully(input, blockBuffer, 0, blockSize) + Utils.readFully(input, blockBuffer, 0, blockSize) blockBuffer } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala index 3d8c20af3b38..a119edf6bdfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala @@ -22,14 +22,13 @@ import java.io.{BufferedWriter, DataInputStream, DataOutputStream, File, FileInp import scala.io.Source import scala.util.Random -import com.google.common.io.ByteStreams - import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType, TimestampType} +import org.apache.spark.util.Utils /** * To run the test suite: @@ -110,7 +109,7 @@ class StreamingQueryHashPartitionVerifySuite extends StreamTest { val rows = (1 to numRows).map { _ => val rowSize = is.readInt() val rowBuffer = new Array[Byte](rowSize) - ByteStreams.readFully(is, rowBuffer, 0, rowSize) + Utils.readFully(is, rowBuffer, 0, rowSize) val row = new UnsafeRow(1) row.pointTo(rowBuffer, rowSize) row --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org