This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f45106286f38 [SPARK-53043][CORE][SQL][K8S] Use Java `transferTo` instead of `IOUtils.copy` f45106286f38 is described below commit f45106286f389d3ae966d29cc716274227b0ec24 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Jul 31 12:54:02 2025 -0700 [SPARK-53043][CORE][SQL][K8S] Use Java `transferTo` instead of `IOUtils.copy` ### What changes were proposed in this pull request? This PR aims to use Java `transferTo` instead of `IOUtils.copy`. In addition, a new Scalastyle rule is added to ban `IOUtils.copy` in order to prevent a future performance regression. ### Why are the changes needed? Since Java 9+, we can use Java's built-in `transferTo` directly which is **significantly faster (over 100x)** than `IOUtils.copy`. In addition, Java's `transferTo` returns the correct value of copied bytes while `IOUtils.copy` returns -1 after 2GB which is a well-known limitation. ```scala scala> import java.io._ import java.io._ scala> spark.time(new FileInputStream("/tmp/4G.bin").transferTo(new FileOutputStream("/dev/null"))) Time taken: 4 ms val res0: Long = 4294967296 scala> spark.time(org.apache.commons.io.IOUtils.copy(new FileInputStream("/tmp/4G.bin"), new FileOutputStream("/dev/null"))) Time taken: 781 ms val res1: Int = -1 ``` ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51751 from dongjoon-hyun/SPARK-53043. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 3 +-- .../scala/org/apache/spark/util/logging/RollingFileAppender.scala | 3 +-- core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala | 4 +--- .../scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala | 5 ++--- scalastyle-config.xml | 5 +++++ .../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 4 ++-- 7 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3f01b86ba22b..9cf07793ef2a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -50,7 +50,6 @@ import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import jakarta.ws.rs.core.UriBuilder import org.apache.commons.codec.binary.Hex -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext @@ -3074,7 +3073,7 @@ private[spark] object Utils val outFile = new File(localDir, fileName) files += outFile out = new FileOutputStream(outFile) - IOUtils.copy(in, out) + in.transferTo(out) out.close() in.closeEntry() } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 88bd117ba22b..90eb1eb4ab56 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -24,7 +24,6 @@ import java.nio.channels.WritableByteChannel import com.google.common.io.ByteStreams import com.google.common.primitives.UnsignedBytes import io.netty.handler.stream.ChunkedStream -import org.apache.commons.io.IOUtils import org.apache.spark.SparkEnv import org.apache.spark.internal.config @@ -251,7 +250,7 @@ private[spark] object ChunkedByteBuffer { val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, length).toInt val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _) Utils.tryWithSafeFinally { - IOUtils.copy(in, out) + in.transferTo(out) } { in.close() out.close() diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index 9cc978df0ee5..867014f235fe 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -21,7 +21,6 @@ import java.io._ import java.util.zip.GZIPOutputStream import com.google.common.io.Files -import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.internal.{config, MDC} @@ -92,7 +91,7 @@ private[spark] class RollingFileAppender( try { inputStream = new FileInputStream(activeFile) gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile)) - IOUtils.copy(inputStream, gzOutputStream) + inputStream.transferTo(gzOutputStream) inputStream.close() gzOutputStream.close() activeFile.delete() diff --git a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala index cacbb7b5e20b..8899971d89ad 100644 --- a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala @@ -20,8 +20,6 @@ package org.apache.spark import java.io.{File, FileInputStream, FileOutputStream} import java.util.zip.{ZipEntry, ZipOutputStream} -import org.apache.commons.io.IOUtils - import org.apache.spark.util.Utils class JobArtifactSetSuite extends SparkFunSuite with LocalSparkContext { @@ -33,7 +31,7 @@ class JobArtifactSetSuite extends SparkFunSuite with LocalSparkContext { val zipOut = new ZipOutputStream(fos) val zipEntry = new ZipEntry(fileToZip.getName) zipOut.putNextEntry(zipEntry) - IOUtils.copy(fis, zipOut) + fis.transferTo(zipOut) Utils.closeQuietly(fis) Utils.closeQuietly(zipOut) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index fbc600e4d0ca..685d70cbdc52 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -28,7 +28,6 @@ import io.fabric8.kubernetes.client.dsl.ExecListener import io.fabric8.kubernetes.client.dsl.ExecListener.Response import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream -import org.apache.commons.io.IOUtils import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.spark.{SPARK_VERSION, SparkException} @@ -146,7 +145,7 @@ object Utils extends Logging { val zipOut = new ZipOutputStream(fos) val zipEntry = new ZipEntry(fileToZip.getName) zipOut.putNextEntry(zipEntry) - IOUtils.copy(fis, zipOut) + fis.transferTo(zipOut) SparkErrorUtils.closeQuietly(fis) SparkErrorUtils.closeQuietly(zipOut) } @@ -168,7 +167,7 @@ object Utils extends Logging { // to 777. tarEntry.setMode(0x81ff) tOut.putArchiveEntry(tarEntry) - IOUtils.copy(fis, tOut) + fis.transferTo(tOut) tOut.closeArchiveEntry() tOut.finish() } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7d0e99249391..7d345cddc7cc 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -551,4 +551,9 @@ This file is divided into 3 sections: <parameters><parameter name="regex">IOUtils\.closeQuietly</parameter></parameters> <customMessage>Use closeQuietly of SparkErrorUtils or Utils instead.</customMessage> </check> + + <check customId="ioutilscopy" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">IOUtils\.copy\(</parameter></parameters> + <customMessage>Use Java transferTo instead.</customMessage> + </check> </scalastyle> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 2acfaf7499c9..1b31fa48d027 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -30,7 +30,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} -import org.apache.commons.io.{FilenameUtils, IOUtils} +import org.apache.commons.io.FilenameUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.json4s.{Formats, NoTypeHints} @@ -861,7 +861,7 @@ class RocksDBFileManager( files.foreach { file => zout.putNextEntry(new ZipEntry(file.getName)) in = new FileInputStream(file) - val bytes = IOUtils.copy(in, zout) + val bytes = in.transferTo(zout) in.close() zout.closeEntry() totalBytes += bytes --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org