This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 954bb5765861 [SPARK-55652][SQL] Optimize
`VectorizedPlainValuesReader.readShorts()` with direct array access for heap
buffers
954bb5765861 is described below
commit 954bb5765861ad1babb65f0490d228b50d33e857
Author: yangjie01 <[email protected]>
AuthorDate: Wed Feb 25 10:33:41 2026 +0800
[SPARK-55652][SQL] Optimize `VectorizedPlainValuesReader.readShorts()` with
direct array access for heap buffers
### What changes were proposed in this pull request?
This PR optimizes `VectorizedPlainValuesReader.readShorts` by introducing a
new batch write method `putShortsFromIntsLittleEndian` in
`WritableColumnVector`, `OnHeapColumnVector`, and `OffHeapColumnVector`.
In Parquet, `SHORT` values are stored as 4-byte little-endian integers. The
previous implementation read each value individually via `ByteBuffer.getInt()`
and called `putShort()` per element, incurring a virtual method dispatch per
value and preventing JIT vectorization.
The new approach:
1. Adds `putShortsFromIntsLittleEndian(int rowId, int count, byte[] src,
int srcIndex)` as an abstract method in `WritableColumnVector`, with
implementations in both `OnHeapColumnVector` and `OffHeapColumnVector`.
2. The implementations use `Platform.getInt` to read directly from the
underlying `byte[]`, handle big-endian platforms by reversing bytes outside the
loop, and write directly to `shortData[]` (OnHeap) or off-heap memory via
`Platform.putShort` (OffHeap).
3. `readShorts` in `VectorizedPlainValuesReader` delegates to
`putShortsFromIntsLittleEndian` when `buffer.hasArray()` is true, matching the
pattern already established by `readIntegers`, `readLongs`, `readFloats`, and
`readDoubles`.
### Why are the changes needed?
The previous implementation of `readShorts` did not take advantage of the
`hasArray()` fast path that other fixed-width type readers (`readIntegers`,
`readLongs`, etc.) already use. This caused unnecessary overhead from:
- Per-element virtual method dispatch via `putShort()`
- `ByteBuffer.getInt()` overhead including internal bounds checking and
byte-order branching on every call
By pushing the batch operation into `WritableColumnVector` and operating
directly on the underlying array, the JIT compiler can more effectively inline
and vectorize the tight loop, eliminating these overheads for the common
heap-buffer case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass Github Actions and add a new test scenario in `ColumnarBatchSuite`
to test `WritableColumnVector#putShortsFromIntsLittleEndian`
- Rename the original code to `OldVectorizedPlainValuesReader`, and compare
the latency of the old and new `readShorts` methods using JMH:
<details>
<summary><b>Benchmark Code (click to expand)</b></summary>
```java
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
BenchmarkMode(Mode.AverageTime)
OutputTimeUnit(TimeUnit.MICROSECONDS)
State(Scope.Thread)
Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
Warmup(iterations = 5, time = 1)
Measurement(iterations = 10, time = 1)
public class VectorizedPlainValuesReaderJMHBenchmark {
// ==================== Parameters ====================
Param({"10000000"})
private int numValues;
// ==================== Test Data ====================
private byte[] shortData;
private static final int BATCH_SIZE = 4096;
// Readers and streams for each scenario
private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
private VectorizedPlainValuesReader newSingleBufferOffHeapReader;
private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
// ==================== State Classes ====================
State(Scope.Thread)
public static class OnHeapColumnVectorState {
public WritableColumnVector shortColumn;
Setup(Level.Iteration)
public void setup() {
shortColumn = new OnHeapColumnVector(BATCH_SIZE,
DataTypes.ShortType);
}
TearDown(Level.Iteration)
public void tearDown() {
shortColumn.close();
}
Setup(Level.Invocation)
public void reset() {
shortColumn.reset();
}
}
// ==================== Setup ====================
Setup(Level.Trial)
public void setupTrial() {
Random random = new Random(42);
shortData = generateShortData(numValues, random);
}
TearDown(Level.Trial)
public void tearDownTrial() {
}
Setup(Level.Invocation)
public void setupInvocation() throws IOException {
// OnHeap SingleBuffer
newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
newSingleBufferOnHeapReader.initFromPage(numValues,
createSingleBufferInputStream(shortData));
oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
oldSingleBufferOnHeapReader.initFromPage(numValues,
createSingleBufferInputStream(shortData));
// OffHeap SingleBuffer
newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
newSingleBufferOffHeapReader.initFromPage(numValues,
createDirectSingleBufferInputStream(shortData));
oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
oldSingleBufferOffHeapReader.initFromPage(numValues,
createDirectSingleBufferInputStream(shortData));
}
// ==================== Data Generation ====================
private byte[] generateShortData(int count, Random random) {
ByteBuffer buffer = ByteBuffer.allocate(count *
4).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; i++) {
buffer.putInt(random.nextInt(65536) - 32768);
}
return buffer.array();
}
// ==================== ByteBufferInputStream Creation
====================
private ByteBufferInputStream createSingleBufferInputStream(byte[]
data) {
ByteBuffer buffer =
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
return ByteBufferInputStream.wrap(buffer);
}
private ByteBuffer createDirectBuffer(byte[] data) {
ByteBuffer buffer =
ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
buffer.put(data);
buffer.flip();
return buffer;
}
private ByteBufferInputStream
createDirectSingleBufferInputStream(byte[] data) {
ByteBuffer buffer = createDirectBuffer(data);
return ByteBufferInputStream.wrap(buffer);
}
//
====================================================================================
// readShorts OnHeap
//
====================================================================================
Benchmark
public void readShorts_onHeap_New(OnHeapColumnVectorState state) throws
IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
newSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE,
numValues - i), state.shortColumn, 0);
}
}
Benchmark
public void readShorts_onHeap_Old(OnHeapColumnVectorState state) throws
IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
oldSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE,
numValues - i), state.shortColumn, 0);
}
}
//
====================================================================================
// readShorts offHeap
//
====================================================================================
Benchmark
public void readShorts_offHeap_New(OnHeapColumnVectorState state)
throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
newSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE,
numValues - i), state.shortColumn, 0);
}
}
Benchmark
public void readShorts_offHeap_Old(OnHeapColumnVectorState state)
throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
oldSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE,
numValues - i), state.shortColumn, 0);
}
}
// ==================== Main Method ====================
public static void main(String[] args) throws RunnerException {
String filter = args.length > 0 ? args[0] :
VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
Options opt = new OptionsBuilder()
.include(filter)
.build();
new Runner(opt).run();
}
}
```
</details>
Perform `build/sbt "sql/Test/runMain
org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark"`
to conduct the test
**Benchmark results:**
- Java 17.0.18+8-LTS
```
Benchmark (numValues)
Mode Cnt Score Error Units
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New 10000000
avgt 10 4048.579 ± 54.466 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old 10000000
avgt 10 3952.443 ± 29.947 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New 10000000
avgt 10 4358.785 ± 45.051 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old 10000000
avgt 10 6775.679 ± 75.302 us/op
```
- Java 21.0.10+7-LTS
```
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New 10000000
avgt 10 3050.606 ± 57.169 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old 10000000
avgt 10 7206.623 ± 29.275 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New 10000000
avgt 10 3252.563 ± 44.564 us/op
VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old 10000000
avgt 10 7145.537 ± 8.843 us/op
```
The test results reveal that the optimized OnHeap path achieves nearly 50%+
performance improvement. The OffHeap path shows no significant negative impact.
### Was this patch authored or co-authored using generative AI tooling?
The benchmark code used for performance testing was generated by GitHub
Copilot.
Closes #54441 from LuciferYang/readShorts.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../datasources/parquet/VectorizedPlainValuesReader.java | 9 +++++++--
.../sql/execution/vectorized/OffHeapColumnVector.java | 16 ++++++++++++++++
.../sql/execution/vectorized/OnHeapColumnVector.java | 14 ++++++++++++++
.../sql/execution/vectorized/WritableColumnVector.java | 7 +++++++
.../sql/execution/vectorized/ColumnarBatchSuite.scala | 11 +++++++++++
5 files changed, 55 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 7364fa5536c0..a040a8990bad 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -315,8 +315,13 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
- for (int i = 0; i < total; i += 1) {
- c.putShort(rowId + i, (short) buffer.getInt());
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putShortsFromIntsLittleEndian(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putShort(rowId + i, (short) buffer.getInt());
+ }
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 42454b283d09..b56a49d8ee40 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -267,6 +267,22 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
null, data + rowId * 2L, count * 2L);
}
+ @Override
+ public void putShortsFromIntsLittleEndian(int rowId, int count, byte[] src,
int srcIndex) {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ long dstOffset = data + rowId * 2L;
+ if (bigEndianPlatform) {
+ for (int i = 0; i < count; ++i, srcOffset += 4, dstOffset += 2) {
+ Platform.putShort(null, dstOffset,
+ (short) Integer.reverseBytes(Platform.getInt(src, srcOffset)));
+ }
+ } else {
+ for (int i = 0; i < count; ++i, srcOffset += 4, dstOffset += 2) {
+ Platform.putShort(null, dstOffset, (short) Platform.getInt(src,
srcOffset));
+ }
+ }
+ }
+
@Override
public short getShort(int rowId) {
if (dictionary == null) {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 401e499fee30..0854c42db672 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -264,6 +264,20 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
}
+ @Override
+ public void putShortsFromIntsLittleEndian(int rowId, int count, byte[] src,
int srcIndex) {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ if (bigEndianPlatform) {
+ for (int i = 0; i < count; ++i, srcOffset += 4) {
+ shortData[rowId + i] = (short)
Integer.reverseBytes(Platform.getInt(src, srcOffset));
+ }
+ } else {
+ for (int i = 0; i < count; ++i, srcOffset += 4) {
+ shortData[rowId + i] = (short) Platform.getInt(src, srcOffset);
+ }
+ }
+ }
+
@Override
public short getShort(int rowId) {
if (dictionary == null) {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 904c48309778..7ce4139df47b 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -281,6 +281,13 @@ public abstract class WritableColumnVector extends
ColumnVector {
*/
public abstract void putShorts(int rowId, int count, short[] src, int
srcIndex);
+ /**
+ * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId,
rowId + count)
+ * Each 4-byte little endian int is truncated to a short.
+ */
+ public abstract void putShortsFromIntsLittleEndian(
+ int rowId, int count, byte[] src, int srcIndex);
+
/**
* Sets values from [src[srcIndex], src[srcIndex + count * 2]) to [rowId,
rowId + count)
* The data in src must be 2-byte platform native endian shorts.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 6d90bb985e26..0f2ca93f287c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -323,6 +323,17 @@ class ColumnarBatchSuite extends SparkFunSuite {
reference += 4
idx += 3
+ val intSrc = Array(0, 1, 32767, -32768, 65535, -1, 12345, -12345)
+ val count = intSrc.length
+ val byteBuffer = ByteBuffer.allocate(count *
4).order(ByteOrder.LITTLE_ENDIAN)
+ intSrc.foreach(byteBuffer.putInt)
+ val byteArray = byteBuffer.array()
+ column.putShortsFromIntsLittleEndian(idx, count, byteArray, 0)
+ (0 until count).foreach { i =>
+ reference += intSrc(i).toShort
+ }
+ idx += count
+
while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]