This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02ba89f02993 [SPARK-54134][SQL] Optimize Arrow memory usage
02ba89f02993 is described below

commit 02ba89f0299314decc6f7afc3603a7d7e3bdc019
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Nov 3 13:07:38 2025 -0800

    [SPARK-54134][SQL] Optimize Arrow memory usage
    
    ### What changes were proposed in this pull request?
    
    This patch proposes some changes to optimize memory usage on Arrow in 
Spark. It compress Arrow IPC data when serializing.
    
    ### Why are the changes needed?
    
    We have encountered OOM when loading data and processing them in PySpark 
through `toArrow` or `toPandas`. The same data could be loaded by PyArrow 
directly but fails to load through `toArrow` or `toPandas` into PySpark due to 
OOM issues.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests. Manually test it locally.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code v2.0.13
    
    Closes #52747 from viirya/release_buffers.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 dev/deps/spark-deps-hadoop-3-hive-2.3              |  1 +
 pom.xml                                            | 19 ++++++
 python/pyspark/sql/tests/arrow/test_arrow.py       | 75 ++++++++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 +++++
 sql/core/pom.xml                                   |  4 ++
 .../sql/execution/arrow/ArrowConverters.scala      | 24 ++++++-
 6 files changed, 137 insertions(+), 2 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 5c4c053293e0..73f36fb161c6 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -15,6 +15,7 @@ antlr4-runtime/4.13.1//antlr4-runtime-4.13.1.jar
 aopalliance-repackaged/3.0.6//aopalliance-repackaged-3.0.6.jar
 arpack/3.0.4//arpack-3.0.4.jar
 arpack_combined_all/0.1//arpack_combined_all-0.1.jar
+arrow-compression/18.3.0//arrow-compression-18.3.0.jar
 arrow-format/18.3.0//arrow-format-18.3.0.jar
 arrow-memory-core/18.3.0//arrow-memory-core-18.3.0.jar
 
arrow-memory-netty-buffer-patch/18.3.0//arrow-memory-netty-buffer-patch-18.3.0.jar
diff --git a/pom.xml b/pom.xml
index 5a62e104557b..95bc00ab7d46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2519,6 +2519,25 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.arrow</groupId>
+        <artifactId>arrow-compression</artifactId>
+        <version>${arrow.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
       <dependency>
         <groupId>org.apache.arrow</groupId>
         <artifactId>arrow-memory-netty</artifactId>
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py 
b/python/pyspark/sql/tests/arrow/test_arrow.py
index be7dd2febc94..af08f8c8c101 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -1810,6 +1810,81 @@ class ArrowTestsMixin:
         df = self.spark.createDataFrame(t)
         self.assertIsInstance(df.schema["fsl"].dataType, ArrayType)
 
+    def test_toPandas_with_compression_codec(self):
+        # Test toPandas() with different compression codec settings
+        df = self.spark.createDataFrame(self.data, schema=self.schema)
+        expected = self.create_pandas_data_frame()
+
+        for codec in ["none", "zstd", "lz4"]:
+            with self.subTest(compressionCodec=codec):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                    pdf = df.toPandas()
+                    assert_frame_equal(expected, pdf)
+
+    def test_toArrow_with_compression_codec(self):
+        # Test toArrow() with different compression codec settings
+        import pyarrow.compute as pc
+
+        t_in = self.create_arrow_table()
+
+        # Convert timezone-naive local timestamp column in input table to UTC
+        # to enable comparison to UTC timestamp column in output table
+        timezone = self.spark.conf.get("spark.sql.session.timeZone")
+        t_in = t_in.set_column(
+            t_in.schema.get_field_index("8_timestamp_t"),
+            "8_timestamp_t",
+            pc.assume_timezone(t_in["8_timestamp_t"], timezone),
+        )
+        t_in = t_in.cast(
+            t_in.schema.set(
+                t_in.schema.get_field_index("8_timestamp_t"),
+                pa.field("8_timestamp_t", pa.timestamp("us", tz="UTC")),
+            )
+        )
+
+        df = self.spark.createDataFrame(self.data, schema=self.schema)
+
+        for codec in ["none", "zstd", "lz4"]:
+            with self.subTest(compressionCodec=codec):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                    t_out = df.toArrow()
+                    self.assertTrue(t_out.equals(t_in))
+
+    def test_toPandas_with_compression_codec_large_dataset(self):
+        # Test compression with a larger dataset to verify memory savings
+        # Create a dataset with repetitive data that compresses well
+        from pyspark.sql.functions import lit, col
+
+        df = self.spark.range(10000).select(
+            col("id"),
+            lit("test_string_value_" * 10).alias("str_col"),
+            (col("id") % 100).alias("mod_col"),
+        )
+
+        for codec in ["none", "zstd", "lz4"]:
+            with self.subTest(compressionCodec=codec):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                    pdf = df.toPandas()
+                    self.assertEqual(len(pdf), 10000)
+                    self.assertEqual(pdf.columns.tolist(), ["id", "str_col", 
"mod_col"])
+
+    def test_toArrow_with_compression_codec_large_dataset(self):
+        # Test compression with a larger dataset for toArrow
+        from pyspark.sql.functions import lit, col
+
+        df = self.spark.range(10000).select(
+            col("id"),
+            lit("test_string_value_" * 10).alias("str_col"),
+            (col("id") % 100).alias("mod_col"),
+        )
+
+        for codec in ["none", "zstd", "lz4"]:
+            with self.subTest(compressionCodec=codec):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                    t = df.toArrow()
+                    self.assertEqual(t.num_rows, 10000)
+                    self.assertEqual(t.column_names, ["id", "str_col", 
"mod_col"])
+
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d88cbe326cfb..b8907629ad37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3987,6 +3987,20 @@ object SQLConf {
           "than zero and less than INT_MAX.")
       .createWithDefaultString("64MB")
 
+  val ARROW_EXECUTION_COMPRESSION_CODEC =
+    buildConf("spark.sql.execution.arrow.compressionCodec")
+      .doc("Compression codec used to compress Arrow IPC data when 
transferring data " +
+        "between JVM and Python processes (e.g., toPandas, toArrow). This can 
significantly " +
+        "reduce memory usage and network bandwidth when transferring large 
datasets. " +
+        "Supported codecs: 'none' (no compression), 'zstd' (Zstandard), 'lz4' 
(LZ4). " +
+        "Note that compression may add CPU overhead but can provide 
substantial memory savings " +
+        "especially for datasets with high compression ratios.")
+      .version("4.1.0")
+      .stringConf
+      .transform(_.toLowerCase(java.util.Locale.ROOT))
+      .checkValues(Set("none", "zstd", "lz4"))
+      .createWithDefault("none")
+
   val ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH =
     
buildConf("spark.sql.execution.arrow.transformWithStateInPySpark.maxStateRecordsPerBatch")
       .doc("When using TransformWithState in PySpark (both Python Row and 
Pandas), limit " +
@@ -7332,6 +7346,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def arrowMaxBytesPerBatch: Long = 
getConf(ARROW_EXECUTION_MAX_BYTES_PER_BATCH)
 
+  def arrowCompressionCodec: String = 
getConf(ARROW_EXECUTION_COMPRESSION_CODEC)
+
   def arrowTransformWithStateInPySparkMaxStateRecordsPerBatch: Int =
     getConf(ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH)
 
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index f0ad904416f1..6b9d08f0dde7 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -279,6 +279,10 @@
       <artifactId>bcpkix-jdk18on</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-compression</artifactId>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 3072a12e3d58..7f260bd2efd0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -23,9 +23,11 @@ import java.nio.channels.{Channels, ReadableByteChannel}
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
+import org.apache.arrow.compression.{Lz4CompressionCodec, ZstdCompressionCodec}
 import org.apache.arrow.flatbuf.MessageHeader
 import org.apache.arrow.memory.BufferAllocator
 import org.apache.arrow.vector._
+import org.apache.arrow.vector.compression.{CompressionCodec, 
NoCompressionCodec}
 import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter, 
ReadChannel, WriteChannel}
 import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, 
MessageSerializer}
 
@@ -37,6 +39,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, 
ColumnVector}
@@ -92,8 +95,25 @@ private[sql] object ArrowConverters extends Logging {
       ArrowUtils.rootAllocator.newChildAllocator(
         s"to${this.getClass.getSimpleName}", 0, Long.MaxValue)
 
-    private val root = VectorSchemaRoot.create(arrowSchema, allocator)
-    protected val unloader = new VectorUnloader(root)
+    protected val root = VectorSchemaRoot.create(arrowSchema, allocator)
+
+    // Create compression codec based on config
+    private val compressionCodecName = SQLConf.get.arrowCompressionCodec
+    private val codec = compressionCodecName match {
+      case "none" => NoCompressionCodec.INSTANCE
+      case "zstd" =>
+        val factory = CompressionCodec.Factory.INSTANCE
+        val codecType = new ZstdCompressionCodec().getCodecType()
+        factory.createCodec(codecType)
+      case "lz4" =>
+        val factory = CompressionCodec.Factory.INSTANCE
+        val codecType = new Lz4CompressionCodec().getCodecType()
+        factory.createCodec(codecType)
+      case other =>
+        throw new IllegalArgumentException(
+          s"Unsupported Arrow compression codec: $other. Supported values: 
none, zstd, lz4")
+    }
+    protected val unloader = new VectorUnloader(root, true, codec, true)
     protected val arrowWriter = ArrowWriter.create(root)
 
     Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to