This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d2878b61bd [SPARK-53188][CORE][SQL] Support `readFully` in 
`SparkStreamUtils` and `JavaUtils`
76d2878b61bd is described below

commit 76d2878b61bd0be878f30aff230b01cd0d5c96cb
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu Aug 7 21:19:04 2025 -0700

    [SPARK-53188][CORE][SQL] Support `readFully` in `SparkStreamUtils` and 
`JavaUtils`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `readFully` in `SparkStreamUtils` and `JavaUtils` 
which is based on Java 9+ `readNBytes` API.
    
    ```java
    public static void readFully(InputStream in, byte[] arr, int off, int len) 
throws IOException {
      if (in == null || len < 0 || (off < 0 || off > arr.length - len)) {
        throw new IllegalArgumentException("Invalid input argument");
      }
      if (len != in.readNBytes(arr, off, len)) {
        throw new EOFException("Fail to read " + len + " bytes.");
      }
    }
    ```
    
    ### Why are the changes needed?
    
    ```scala
    - ByteStreams.readFully(is, rowBuffer, 0, rowSize)
    + Utils.readFully(is, rowBuffer, 0, rowSize)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51915 from dongjoon-hyun/SPARK-53188.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/network/shuffle/ExternalBlockHandlerSuite.java     |  3 +--
 .../main/java/org/apache/spark/network/util/JavaUtils.java   | 12 ++++++++++++
 .../main/scala/org/apache/spark/util/SparkStreamUtils.scala  |  5 +++++
 .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java |  4 ++--
 .../scala/org/apache/spark/security/CryptoStreamUtils.scala  |  3 +--
 core/src/main/scala/org/apache/spark/util/Utils.scala        |  2 +-
 .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala   |  5 ++---
 .../scala/org/apache/spark/io/CompressionCodecSuite.scala    |  5 ++---
 .../spark/storage/ShuffleBlockFetcherIteratorSuite.scala     |  3 +--
 dev/checkstyle.xml                                           |  4 ++++
 scalastyle-config.xml                                        |  5 +++++
 .../org/apache/spark/sql/execution/UnsafeRowSerializer.scala |  7 +++----
 .../streaming/state/HDFSBackedStateStoreProvider.scala       |  9 ++++-----
 .../sql/execution/streaming/state/StateStoreChangelog.scala  |  7 +++----
 .../streaming/StreamingQueryHashPartitionVerifySuite.scala   |  5 ++---
 15 files changed, 48 insertions(+), 31 deletions(-)

diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index f7edc8837fde..2a3135e3c8ae 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -27,7 +27,6 @@ import java.util.zip.Checksum;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.Timer;
-import com.google.common.io.ByteStreams;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
@@ -136,7 +135,7 @@ public class ExternalBlockHandlerSuite {
       CheckedInputStream checkedIn = new CheckedInputStream(
         blockMarkers[0].createInputStream(), checksum);
       byte[] buffer = new byte[10];
-      ByteStreams.readFully(checkedIn, buffer, 0, (int) 
blockMarkers[0].size());
+      JavaUtils.readFully(checkedIn, buffer, 0, (int) blockMarkers[0].size());
       long checksumByWriter = checkedIn.getChecksum().getValue();
 
       // when checksumByWriter == checksumRecalculated and checksumByReader != 
checksumByWriter
diff --git 
a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java 
b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 9e2db5250566..3a2485520c66 100644
--- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -610,6 +610,18 @@ public class JavaUtils {
     }
   }
 
+  /**
+   * Read len bytes exactly, otherwise throw exceptions.
+   */
+  public static void readFully(InputStream in, byte[] arr, int off, int len) 
throws IOException {
+    if (in == null || len < 0 || (off < 0 || off > arr.length - len)) {
+      throw new IllegalArgumentException("Invalid input argument");
+    }
+    if (len != in.readNBytes(arr, off, len)) {
+      throw new EOFException("Fail to read " + len + " bytes.");
+    }
+  }
+
   /**
    * Copy the content of a URL into a file.
    */
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
index e1a1c98987e8..d4e44b019120 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
@@ -20,6 +20,7 @@ import java.io.{FileInputStream, FileOutputStream, 
InputStream, OutputStream}
 import java.nio.channels.{FileChannel, WritableByteChannel}
 import java.nio.charset.StandardCharsets
 
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally
 
 private[spark] trait SparkStreamUtils {
@@ -109,6 +110,10 @@ private[spark] trait SparkStreamUtils {
   def toString(in: InputStream): String = {
     new String(in.readAllBytes(), StandardCharsets.UTF_8)
   }
+
+  def readFully(in: InputStream, arr: Array[Byte], off: Int, len: Int): Unit = 
{
+    JavaUtils.readFully(in, arr, off, len)
+  }
 }
 
 private [spark] object SparkStreamUtils extends SparkStreamUtils
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 0693f8cb1a80..6674fcd73b92 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.util.collection.unsafe.sort;
 
-import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
@@ -27,6 +26,7 @@ import org.apache.spark.internal.SparkLogger;
 import org.apache.spark.internal.SparkLoggerFactory;
 import org.apache.spark.io.NioBufferedFileInputStream;
 import org.apache.spark.io.ReadAheadInputStream;
+import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
@@ -128,7 +128,7 @@ public final class UnsafeSorterSpillReader extends 
UnsafeSorterIterator implemen
       arr = new byte[recordLength];
       baseObject = arr;
     }
-    ByteStreams.readFully(in, arr, 0, recordLength);
+    JavaUtils.readFully(in, arr, 0, recordLength);
     numRecordsRemaining--;
     if (numRecordsRemaining == 0) {
       close();
diff --git 
a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala 
b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
index 8bdb80d65207..b230df93e1fa 100644
--- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
+++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
@@ -26,7 +26,6 @@ import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
 
 import scala.jdk.CollectionConverters._
 
-import com.google.common.io.ByteStreams
 import org.apache.commons.crypto.random._
 import org.apache.commons.crypto.stream._
 
@@ -84,7 +83,7 @@ private[spark] object CryptoStreamUtils extends Logging {
       sparkConf: SparkConf,
       key: Array[Byte]): InputStream = {
     val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-    ByteStreams.readFully(is, iv)
+    JavaUtils.readFully(is, iv, 0, IV_LENGTH_IN_BYTES)
     val params = new CryptoParams(key, sparkConf)
     new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
       new IvParameterSpec(iv))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fc9375025881..57f70489a860 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1589,7 +1589,7 @@ private[spark] object Utils
 
     try {
       stream.skipNBytes(effectiveStart)
-      ByteStreams.readFully(stream, buff)
+      readFully(stream, buff, 0, buff.length)
     } finally {
       stream.close()
     }
diff --git 
a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala 
b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index 68b181de2928..992bb37f44a0 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.io
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
 import java.nio.ByteBuffer
 
-import com.google.common.io.ByteStreams
-
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
@@ -145,7 +144,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with 
SharedSparkContext {
 
     val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
     val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
-    ByteStreams.readFully(inputStream, bytesFromStream)
+    Utils.readFully(inputStream, bytesFromStream, 0, bytesFromStream.length)
     assert(bytesFromStream === bytes1.array() ++ bytes2.array())
     assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
diff --git 
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala 
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ff971b72d891..d6f0bfd237e4 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,10 +20,9 @@ package org.apache.spark.io
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import java.util.Locale
 
-import com.google.common.io.ByteStreams
-
 import org.apache.spark.{SparkConf, SparkFunSuite, 
SparkIllegalArgumentException}
 import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED
+import org.apache.spark.util.Utils
 
 class CompressionCodecSuite extends SparkFunSuite {
   val conf = new SparkConf(false)
@@ -158,7 +157,7 @@ class CompressionCodecSuite extends SparkFunSuite {
     }
     val concatenatedBytes = codec.compressedInputStream(new 
ByteArrayInputStream(bytes1 ++ bytes2))
     val decompressed: Array[Byte] = new Array[Byte](128)
-    ByteStreams.readFully(concatenatedBytes, decompressed)
+    Utils.readFully(concatenatedBytes, decompressed, 0, decompressed.length)
     assert(decompressed.toSeq === (0 to 127))
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 3de78407ca16..211de2e8729e 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -29,7 +29,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
 // scalastyle:on executioncontextglobal
 import scala.concurrent.Future
 
-import com.google.common.io.ByteStreams
 import io.netty.util.internal.OutOfDirectMemoryError
 import org.apache.logging.log4j.Level
 import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -289,7 +288,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite {
       intercept[FetchFailedException] {
         val inputStream = iterator.next()._2
         // Consume the data to trigger the corruption
-        ByteStreams.readFully(inputStream, new Array[Byte](100))
+        Utils.readFully(inputStream, new Array[Byte](100), 0, 100)
       }
       // The block will be fetched only once because corruption can't be 
detected in
       // maxBytesInFlight/3 of the data size
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index 78f02b957b47..f7c4801c9e6f 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -213,6 +213,10 @@
             <property name="format" value="ByteStreams\.skipFully"/>
             <property name="message" value="Use Java skipNBytes instead." />
         </module>
+        <module name="RegexpSinglelineJava">
+            <property name="format" value="ByteStreams\.readFully"/>
+            <property name="message" value="Use readFully of 
JavaUtils/SparkStreamUtils/Utils instead." />
+        </module>
         <module name="RegexpSinglelineJava">
             <property name="format" value="FileUtils.writeStringToFile"/>
             <property name="message" value="Use 
java.nio.file.Files.writeString instead." />
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 36c31c8e3187..3b3dce52b36c 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -737,6 +737,11 @@ This file is divided into 3 sections:
     <customMessage>Use Java `skipNBytes` instead.</customMessage>
   </check>
 
+  <check customId="readFully" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter 
name="regex">\bByteStreams\.readFully\b</parameter></parameters>
+    <customMessage>Use readFully of JavaUtils/SparkStreamUtils/Utils 
instead.</customMessage>
+  </check>
+
   <check customId="maputils" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
     <parameters><parameter 
name="regex">org\.apache\.commons\.collections4\.MapUtils\b</parameter></parameters>
     <customMessage>Use org.apache.spark.util.collection.Utils 
instead.</customMessage>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 42fcfa8d60fa..9728d664998e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -22,13 +22,12 @@ import java.nio.ByteBuffer
 
 import scala.reflect.ClassTag
 
-import com.google.common.io.ByteStreams
-
 import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, Serializer, SerializerInstance}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Utils
 
 /**
  * Serializer for serializing [[UnsafeRow]]s during shuffle. Since UnsafeRows 
are already stored as
@@ -125,7 +124,7 @@ private class UnsafeRowSerializerInstance(
             if (rowBuffer.length < rowSize) {
               rowBuffer = new Array[Byte](rowSize)
             }
-            ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
+            Utils.readFully(dIn, rowBuffer, 0, rowSize)
             row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize)
             rowSize = readSize()
             if (rowSize == EOF) { // We are returning the last row in this 
stream
@@ -160,7 +159,7 @@ private class UnsafeRowSerializerInstance(
         if (rowBuffer.length < rowSize) {
           rowBuffer = new Array[Byte](rowSize)
         }
-        ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
+        Utils.readFully(dIn, rowBuffer, 0, rowSize)
         row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize)
         row.asInstanceOf[T]
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 0ba4b1955c82..8f8ec84e2d10 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
@@ -662,7 +661,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
             fileToRead, toString(), keySize)
         } else {
           val keyRowBuffer = new Array[Byte](keySize)
-          ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
+          Utils.readFully(input, keyRowBuffer, 0, keySize)
 
           val keyRow = new UnsafeRow(keySchema.fields.length)
           keyRow.pointTo(keyRowBuffer, keySize)
@@ -672,7 +671,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
             map.remove(keyRow)
           } else {
             val valueRowBuffer = new Array[Byte](valueSize)
-            ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
+            Utils.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
             // If valueSize in existing file is not multiple of 8, floor it to 
multiple of 8.
             // This is a workaround for the following:
@@ -782,7 +781,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
             fileToRead, toString(), keySize)
         } else {
           val keyRowBuffer = new Array[Byte](keySize)
-          ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
+          Utils.readFully(input, keyRowBuffer, 0, keySize)
 
           val keyRow = new UnsafeRow(keySchema.fields.length)
           keyRow.pointTo(keyRowBuffer, keySize)
@@ -793,7 +792,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
               fileToRead, toString(), valueSize)
           } else {
             val valueRowBuffer = new Array[Byte](valueSize)
-            ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
+            Utils.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
             // If valueSize in existing file is not multiple of 8, floor it to 
multiple of 8.
             // This is a workaround for the following:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index ccb58ed05a6d..9858b2494f84 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -21,7 +21,6 @@ import java.io.{DataInputStream, DataOutputStream, 
FileNotFoundException, IOExce
 
 import scala.util.control.NonFatal
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.fs.{FSError, Path}
 import org.json4s._
 import org.json4s.jackson.Serialization
@@ -485,14 +484,14 @@ class StateStoreChangelogReaderV1(
     } else {
       // TODO: reuse the key buffer and value buffer across records.
       val keyBuffer = new Array[Byte](keySize)
-      ByteStreams.readFully(input, keyBuffer, 0, keySize)
+      Utils.readFully(input, keyBuffer, 0, keySize)
       val valueSize = input.readInt()
       if (valueSize < 0) {
         // A deletion record
         (RecordType.DELETE_RECORD, keyBuffer, null)
       } else {
         val valueBuffer = new Array[Byte](valueSize)
-        ByteStreams.readFully(input, valueBuffer, 0, valueSize)
+        Utils.readFully(input, valueBuffer, 0, valueSize)
         // A put record.
         (RecordType.PUT_RECORD, keyBuffer, valueBuffer)
       }
@@ -516,7 +515,7 @@ class StateStoreChangelogReaderV2(
   private def parseBuffer(input: DataInputStream): Array[Byte] = {
     val blockSize = input.readInt()
     val blockBuffer = new Array[Byte](blockSize)
-    ByteStreams.readFully(input, blockBuffer, 0, blockSize)
+    Utils.readFully(input, blockBuffer, 0, blockSize)
     blockBuffer
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
index 3d8c20af3b38..a119edf6bdfb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
@@ -22,14 +22,13 @@ import java.io.{BufferedWriter, DataInputStream, 
DataOutputStream, File, FileInp
 import scala.io.Source
 import scala.util.Random
 
-import com.google.common.io.ByteStreams
-
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.util.Utils
 
 /**
  * To run the test suite:
@@ -110,7 +109,7 @@ class StreamingQueryHashPartitionVerifySuite extends 
StreamTest {
     val rows = (1 to numRows).map { _ =>
       val rowSize = is.readInt()
       val rowBuffer = new Array[Byte](rowSize)
-      ByteStreams.readFully(is, rowBuffer, 0, rowSize)
+      Utils.readFully(is, rowBuffer, 0, rowSize)
       val row = new UnsafeRow(1)
       row.pointTo(rowBuffer, rowSize)
       row


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to