This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4da26e4bf9ea [SPARK-55683][SQL][FOLLOWUP] Optimize
`VectorizedPlainValuesReader.readUnsignedLongs` to reuse scratch buffer and
avoid per-element allocations
4da26e4bf9ea is described below
commit 4da26e4bf9eab119ff2489c5fdf85efe60f6f469
Author: yangjie01 <[email protected]>
AuthorDate: Thu Feb 26 14:11:12 2026 -0800
[SPARK-55683][SQL][FOLLOWUP] Optimize
`VectorizedPlainValuesReader.readUnsignedLongs` to reuse scratch buffer and
avoid per-element allocations
### What changes were proposed in this pull request?
This pr refer to the suggestion from Copilot:
https://github.com/apache/spark/pull/54479#pullrequestreview-3855142296,
further optimizes `VectorizedPlainValuesReader.readUnsignedLongs` by
introducing a reusable scratch buffer to eliminate per-element `byte[]`
allocations introduced in the previous refactoring.
The previous implementation allocates a new `byte[]` per element for the
encoded output:
```java
// Previous: new byte[totalLen] per element, plus new byte[]{0} for zero
values
byte[] dest = new byte[totalLen];
...
c.putByteArray(rowId, dest, 0, totalLen);
```
The new implementation allocates a single `byte[9]` scratch buffer once per
batch and reuses it across all elements. Since
`WritableColumnVector.putByteArray` copies the bytes into its internal storage
immediately, the scratch buffer can be safely overwritten on the next iteration:
```java
// New: one byte[9] allocated per batch, reused for every element
byte[] scratch = new byte[9];
for (...) {
putLittleEndianBytesAsBigInteger(c, rowId, src, offset, scratch);
}
```
The scratch buffer is sized at 9 bytes to accommodate the worst case: 1
`0x00` sign byte + 8 value bytes. The zero value special case is also handled
via scratch, avoiding the previous `new byte[]{0}` allocation.
### Why are the changes needed?
The previous implementation still allocates one `byte[]` per element for
the encoded output. For a typical batch of 4096 values this means 4096 heap
allocations per `readUnsignedLongs` call, creating GC pressure in workloads
that read large `UINT_64` columns. With the scratch buffer approach, the entire
batch produces only 2 allocations: `byte[9]` (scratch) and `byte[8]` (direct
buffer fallback read buffer), regardless of batch size.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass Github Action
- Reused the JMH benchmark provided in
https://github.com/apache/spark/pull/54479, and the test results are as follows:
Java 17
```
[info] Benchmark
(numValues) Mode Cnt Score Error Units
[info]
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New
10000000 avgt 10 233820.658 ± 1888.523 us/op
[info]
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old
10000000 avgt 10 255563.248 ± 3500.165 us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New
10000000 avgt 10 228672.684 ± 2985.496 us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old
10000000 avgt 10 275756.804 ± 2065.405 us/op
```
Java 21
```
[info] Benchmark
(numValues) Mode Cnt Score Error Units
[info]
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New
10000000 avgt 10 241977.924 ± 15125.343 us/op
[info]
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old
10000000 avgt 10 250343.470 ± 1342.509 us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New
10000000 avgt 10 212929.948 ± 1387.671 us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old
10000000 avgt 10 274561.949 ± 1226.348 us/op
```
Judging from the test results, the onHeap path demonstrates approximately a
17-22% improvement, while the offHeap path shows roughly a 3-9% improvement
across Java 17 and Java 21.
### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.
Closes #54510 from LuciferYang/SPARK-55683-FOLLOWUP.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../parquet/VectorizedPlainValuesReader.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9ad07a1ff5ee..852d02c2cdf1 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -198,18 +198,20 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
public final void readUnsignedLongs(int total, WritableColumnVector c, int
rowId) {
int requiredBytes = total * 8;
ByteBuffer buffer = getBuffer(requiredBytes);
+ // scratch buffer: max 9 bytes (0x00 sign byte + 8 value bytes), reused
per batch
+ byte[] scratch = new byte[9];
if (buffer.hasArray()) {
byte[] src = buffer.array();
int offset = buffer.arrayOffset() + buffer.position();
for (int i = 0; i < total; i++, rowId++, offset += 8) {
- putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
+ putLittleEndianBytesAsBigInteger(c, rowId, src, offset, scratch);
}
} else {
// direct buffer fallback: copy 8 bytes per value
byte[] data = new byte[8];
for (int i = 0; i < total; i++, rowId++) {
buffer.get(data);
- putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
+ putLittleEndianBytesAsBigInteger(c, rowId, data, 0, scratch);
}
}
}
@@ -239,9 +241,11 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
* @param src the source byte array containing little-endian encoded data
* @param offset the starting position in {@code src}; reads bytes
* {@code src[offset..offset+7]}
+ * @param scratch a caller-provided reusable buffer of at least 9 bytes; its
contents
+ * after this call are undefined
*/
private static void putLittleEndianBytesAsBigInteger(
- WritableColumnVector c, int rowId, byte[] src, int offset) {
+ WritableColumnVector c, int rowId, byte[] src, int offset, byte[]
scratch) {
// src is little-endian; the most significant byte is at src[offset + 7].
// Scan from the most significant end to find the first non-zero byte,
// which determines the minimal number of bytes needed for encoding.
@@ -254,7 +258,9 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
// BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new
byte[0])
// throws NumberFormatException("Zero length BigInteger").
if (msbIndex == offset && src[offset] == 0) {
- c.putByteArray(rowId, new byte[]{0});
+ scratch[0] = 0x00;
+ // putByteArray copies the bytes into arrayData(), so scratch can be
safely reused
+ c.putByteArray(rowId, scratch, 0, 1);
return;
}
@@ -266,16 +272,16 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
int valueLen = msbIndex - offset + 1;
int totalLen = needSignByte ? valueLen + 1 : valueLen;
- byte[] dest = new byte[totalLen];
- int destOffset = 0;
+ int scratchOffset = 0;
if (needSignByte) {
- dest[destOffset++] = 0x00;
+ scratch[scratchOffset++] = 0x00;
}
// Reverse byte order: little-endian src → big-endian dest
for (int i = msbIndex; i >= offset; i--) {
- dest[destOffset++] = src[i];
+ scratch[scratchOffset++] = src[i];
}
- c.putByteArray(rowId, dest, 0, totalLen);
+ // putByteArray copies the bytes into arrayData(), so scratch can be
safely reused
+ c.putByteArray(rowId, scratch, 0, totalLen);
}
// A fork of `readLongs` to rebase the timestamp values. For performance
reasons, this method
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]