This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a54fff253ca3 [SPARK-55802][SQL] Fix integer overflow when computing
Arrow batch bytes
a54fff253ca3 is described below
commit a54fff253ca3aa8da282f3970839a693a7b2bb89
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Mar 4 08:21:38 2026 -0800
[SPARK-55802][SQL] Fix integer overflow when computing Arrow batch bytes
### What changes were proposed in this pull request?
`ArrowWriter.sizeInBytes()` and `SliceBytesArrowOutputProcessorImpl
.getBatchBytes()` both accumulated per-column buffer sizes (each an `Int`) into
an `Int` accumulator. When the total exceeds 2 GB the sum silently wraps
negative, causing the byte-limit checks controlled by
`spark.sql.execution.arrow.maxBytesPerBatch` and
`spark.sql.execution.arrow.maxBytesPerOutputBatch` to behave incorrectly
and potentially allow oversized batches through.
Fix by changing both accumulators and return types to `Long`.
### Why are the changes needed?
Fix possible overflow when calculating Arrow batch bytes.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6 <noreplyanthropic.com>
Closes #54584 from viirya/fix-arrow-batch-bytes-overflow.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit df195ac59de5bd896cd70699cfe96ebf78bf2976)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 4 ++--
.../org/apache/spark/sql/execution/python/PythonArrowOutput.scala | 4 ++--
.../execution/python/streaming/BaseStreamingArrowWriterSuite.scala | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index 8d68e74dbf87..b5269da035f3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -124,9 +124,9 @@ class ArrowWriter(val root: VectorSchemaRoot, fields:
Array[ArrowFieldWriter]) {
count += 1
}
- def sizeInBytes(): Int = {
+ def sizeInBytes(): Long = {
var i = 0
- var bytes = 0
+ var bytes = 0L
while (i < fields.size) {
bytes += fields(i).getSizeInBytes()
i += 1
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index df458fa37d7f..1e8f4ebfd1fe 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -288,8 +288,8 @@ class SliceBytesArrowOutputProcessorImpl(
}
}
- private def getBatchBytes(root: VectorSchemaRoot): Int = {
- var batchBytes = 0
+ private def getBatchBytes(root: VectorSchemaRoot): Long = {
+ var batchBytes = 0L
root.getFieldVectors.asScala.foreach { vector =>
batchBytes += vector.getBufferSize
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
index fc10a102b4f5..4b44a923fd14 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
@@ -76,7 +76,7 @@ class BaseStreamingArrowWriterSuite extends SparkFunSuite
with BeforeAndAfterEac
()
}
- when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter }
+ when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter.toLong }
// Set arrowMaxBytesPerBatch to 1
transformWithStateInPySparkWriter = new BaseStreamingArrowWriter(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]