This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f45106286f38 [SPARK-53043][CORE][SQL][K8S] Use Java `transferTo` 
instead of `IOUtils.copy`
f45106286f38 is described below

commit f45106286f389d3ae966d29cc716274227b0ec24
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu Jul 31 12:54:02 2025 -0700

    [SPARK-53043][CORE][SQL][K8S] Use Java `transferTo` instead of 
`IOUtils.copy`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use Java `transferTo` instead of `IOUtils.copy`. In 
addition, a new Scalastyle rule is added to ban `IOUtils.copy` in order to 
prevent a future performance regression.
    
    ### Why are the changes needed?
    
    Since Java 9+, we can use Java's built-in `transferTo` directly which is 
**significantly faster (over 100x)** than `IOUtils.copy`. In addition, Java's 
`transferTo` returns the correct value of copied bytes while `IOUtils.copy` 
returns -1 after 2GB which is a well-known limitation.
    
    ```scala
    scala> import java.io._
    import java.io._
    
    scala> spark.time(new FileInputStream("/tmp/4G.bin").transferTo(new 
FileOutputStream("/dev/null")))
    Time taken: 4 ms
    val res0: Long = 4294967296
    
    scala> spark.time(org.apache.commons.io.IOUtils.copy(new 
FileInputStream("/tmp/4G.bin"), new FileOutputStream("/dev/null")))
    Time taken: 781 ms
    val res1: Int = -1
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51751 from dongjoon-hyun/SPARK-53043.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 core/src/main/scala/org/apache/spark/util/Utils.scala                | 3 +--
 core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 3 +--
 .../scala/org/apache/spark/util/logging/RollingFileAppender.scala    | 3 +--
 core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala       | 4 +---
 .../scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala    | 5 ++---
 scalastyle-config.xml                                                | 5 +++++
 .../spark/sql/execution/streaming/state/RocksDBFileManager.scala     | 4 ++--
 7 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3f01b86ba22b..9cf07793ef2a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -50,7 +50,6 @@ import com.google.common.io.{ByteStreams, Files => GFiles}
 import com.google.common.net.InetAddresses
 import jakarta.ws.rs.core.UriBuilder
 import org.apache.commons.codec.binary.Hex
-import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
@@ -3074,7 +3073,7 @@ private[spark] object Utils
           val outFile = new File(localDir, fileName)
           files += outFile
           out = new FileOutputStream(outFile)
-          IOUtils.copy(in, out)
+          in.transferTo(out)
           out.close()
           in.closeEntry()
         }
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 88bd117ba22b..90eb1eb4ab56 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -24,7 +24,6 @@ import java.nio.channels.WritableByteChannel
 import com.google.common.io.ByteStreams
 import com.google.common.primitives.UnsignedBytes
 import io.netty.handler.stream.ChunkedStream
-import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.config
@@ -251,7 +250,7 @@ private[spark] object ChunkedByteBuffer {
     val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, 
length).toInt
     val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate 
_)
     Utils.tryWithSafeFinally {
-      IOUtils.copy(in, out)
+      in.transferTo(out)
     } {
       in.close()
       out.close()
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 9cc978df0ee5..867014f235fe 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -21,7 +21,6 @@ import java.io._
 import java.util.zip.GZIPOutputStream
 
 import com.google.common.io.Files
-import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.{config, MDC}
@@ -92,7 +91,7 @@ private[spark] class RollingFileAppender(
       try {
         inputStream = new FileInputStream(activeFile)
         gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
-        IOUtils.copy(inputStream, gzOutputStream)
+        inputStream.transferTo(gzOutputStream)
         inputStream.close()
         gzOutputStream.close()
         activeFile.delete()
diff --git a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala 
b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
index cacbb7b5e20b..8899971d89ad 100644
--- a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark
 import java.io.{File, FileInputStream, FileOutputStream}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
-import org.apache.commons.io.IOUtils
-
 import org.apache.spark.util.Utils
 
 class JobArtifactSetSuite extends SparkFunSuite with LocalSparkContext {
@@ -33,7 +31,7 @@ class JobArtifactSetSuite extends SparkFunSuite with 
LocalSparkContext {
     val zipOut = new ZipOutputStream(fos)
     val zipEntry = new ZipEntry(fileToZip.getName)
     zipOut.putNextEntry(zipEntry)
-    IOUtils.copy(fis, zipOut)
+    fis.transferTo(zipOut)
     Utils.closeQuietly(fis)
     Utils.closeQuietly(zipOut)
   }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index fbc600e4d0ca..685d70cbdc52 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -28,7 +28,6 @@ import io.fabric8.kubernetes.client.dsl.ExecListener
 import io.fabric8.kubernetes.client.dsl.ExecListener.Response
 import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, 
TarArchiveOutputStream}
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream
-import org.apache.commons.io.IOUtils
 import org.apache.commons.io.output.ByteArrayOutputStream
 
 import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -146,7 +145,7 @@ object Utils extends Logging {
     val zipOut = new ZipOutputStream(fos)
     val zipEntry = new ZipEntry(fileToZip.getName)
     zipOut.putNextEntry(zipEntry)
-    IOUtils.copy(fis, zipOut)
+    fis.transferTo(zipOut)
     SparkErrorUtils.closeQuietly(fis)
     SparkErrorUtils.closeQuietly(zipOut)
   }
@@ -168,7 +167,7 @@ object Utils extends Logging {
         // to 777.
         tarEntry.setMode(0x81ff)
         tOut.putArchiveEntry(tarEntry)
-        IOUtils.copy(fis, tOut)
+        fis.transferTo(tOut)
         tOut.closeArchiveEntry()
         tOut.finish()
       }
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7d0e99249391..7d345cddc7cc 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -551,4 +551,9 @@ This file is divided into 3 sections:
     <parameters><parameter 
name="regex">IOUtils\.closeQuietly</parameter></parameters>
     <customMessage>Use closeQuietly of SparkErrorUtils or Utils 
instead.</customMessage>
   </check>
+
+  <check customId="ioutilscopy" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter 
name="regex">IOUtils\.copy\(</parameter></parameters>
+    <customMessage>Use Java transferTo instead.</customMessage>
+  </check>
 </scalastyle>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 2acfaf7499c9..1b31fa48d027 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -30,7 +30,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
-import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.commons.io.FilenameUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.json4s.{Formats, NoTypeHints}
@@ -861,7 +861,7 @@ class RocksDBFileManager(
       files.foreach { file =>
         zout.putNextEntry(new ZipEntry(file.getName))
         in = new FileInputStream(file)
-        val bytes = IOUtils.copy(in, zout)
+        val bytes = in.transferTo(zout)
         in.close()
         zout.closeEntry()
         totalBytes += bytes


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to