This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 954bb5765861 [SPARK-55652][SQL] Optimize 
`VectorizedPlainValuesReader.readShorts()` with direct array access for heap 
buffers
954bb5765861 is described below

commit 954bb5765861ad1babb65f0490d228b50d33e857
Author: yangjie01 <[email protected]>
AuthorDate: Wed Feb 25 10:33:41 2026 +0800

    [SPARK-55652][SQL] Optimize `VectorizedPlainValuesReader.readShorts()` with 
direct array access for heap buffers
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes `VectorizedPlainValuesReader.readShorts` by introducing a 
new batch write method `putShortsFromIntsLittleEndian` in 
`WritableColumnVector`, `OnHeapColumnVector`, and `OffHeapColumnVector`.
    
    In Parquet, `SHORT` values are stored as 4-byte little-endian integers. The 
previous implementation read each value individually via `ByteBuffer.getInt()` 
and called `putShort()` per element, incurring a virtual method dispatch per 
value and preventing JIT vectorization.
    
    The new approach:
    1. Adds `putShortsFromIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex)` as an abstract method in `WritableColumnVector`, with 
implementations in both `OnHeapColumnVector` and `OffHeapColumnVector`.
    2. The implementations use `Platform.getInt` to read directly from the 
underlying `byte[]`, handle big-endian platforms by reversing bytes outside the 
loop, and write directly to `shortData[]` (OnHeap) or off-heap memory via 
`Platform.putShort` (OffHeap).
    3. `readShorts` in `VectorizedPlainValuesReader` delegates to 
`putShortsFromIntsLittleEndian` when `buffer.hasArray()` is true, matching the 
pattern already established by `readIntegers`, `readLongs`, `readFloats`, and 
`readDoubles`.
    
    ### Why are the changes needed?
    
    The previous implementation of `readShorts` did not take advantage of the 
`hasArray()` fast path that other fixed-width type readers (`readIntegers`, 
`readLongs`, etc.) already use. This caused unnecessary overhead from:
    - Per-element virtual method dispatch via `putShort()`
    - `ByteBuffer.getInt()` overhead including internal bounds checking and 
byte-order branching on every call
    
    By pushing the batch operation into `WritableColumnVector` and operating 
directly on the underlying array, the JIT compiler can more effectively inline 
and vectorize the tight loop, eliminating these overheads for the common 
heap-buffer case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - Pass Github Actions and add a new test scenario in `ColumnarBatchSuite` 
to test `WritableColumnVector#putShortsFromIntsLittleEndian`
    - Rename the original code to `OldVectorizedPlainValuesReader`, and compare 
the latency of the old and new `readShorts` methods using JMH:
    
    <details>
    <summary><b>Benchmark Code (click to expand)</b></summary>
    
    ```java
    
    package org.apache.spark.sql.execution.datasources.parquet;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.ByteOrder;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    import org.openjdk.jmh.annotations.*;
    import org.openjdk.jmh.runner.Runner;
    import org.openjdk.jmh.runner.RunnerException;
    import org.openjdk.jmh.runner.options.Options;
    import org.openjdk.jmh.runner.options.OptionsBuilder;
    
    import org.apache.parquet.bytes.ByteBufferInputStream;
    
    import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
    import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
    import org.apache.spark.sql.types.DataTypes;
    
    BenchmarkMode(Mode.AverageTime)
    OutputTimeUnit(TimeUnit.MICROSECONDS)
    State(Scope.Thread)
    Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
    Warmup(iterations = 5, time = 1)
    Measurement(iterations = 10, time = 1)
    public class VectorizedPlainValuesReaderJMHBenchmark {
    
        // ==================== Parameters ====================
    
        Param({"10000000"})
        private int numValues;
    
        // ==================== Test Data ====================
    
        private byte[] shortData;
        private static final int BATCH_SIZE = 4096;
    
        // Readers and streams for each scenario
        private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
        private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
        private VectorizedPlainValuesReader newSingleBufferOffHeapReader;
        private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
    
        // ==================== State Classes ====================
        State(Scope.Thread)
        public static class OnHeapColumnVectorState {
            public WritableColumnVector shortColumn;
            Setup(Level.Iteration)
            public void setup() {
                shortColumn = new OnHeapColumnVector(BATCH_SIZE, 
DataTypes.ShortType);
            }
            TearDown(Level.Iteration)
            public void tearDown() {
                shortColumn.close();
            }
            Setup(Level.Invocation)
            public void reset() {
                shortColumn.reset();
            }
        }
    
        // ==================== Setup ====================
    
        Setup(Level.Trial)
        public void setupTrial() {
            Random random = new Random(42);
            shortData = generateShortData(numValues, random);
        }
    
        TearDown(Level.Trial)
        public void tearDownTrial() {
        }
    
        Setup(Level.Invocation)
        public void setupInvocation() throws IOException {
            // OnHeap SingleBuffer
            newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
            newSingleBufferOnHeapReader.initFromPage(numValues, 
createSingleBufferInputStream(shortData));
            oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
            oldSingleBufferOnHeapReader.initFromPage(numValues, 
createSingleBufferInputStream(shortData));
            // OffHeap SingleBuffer
            newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
            newSingleBufferOffHeapReader.initFromPage(numValues, 
createDirectSingleBufferInputStream(shortData));
            oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
            oldSingleBufferOffHeapReader.initFromPage(numValues, 
createDirectSingleBufferInputStream(shortData));
        }
    
        // ==================== Data Generation ====================
        private byte[] generateShortData(int count, Random random) {
            ByteBuffer buffer = ByteBuffer.allocate(count * 
4).order(ByteOrder.LITTLE_ENDIAN);
            for (int i = 0; i < count; i++) {
                buffer.putInt(random.nextInt(65536) - 32768);
            }
            return buffer.array();
        }
    
        // ==================== ByteBufferInputStream Creation 
====================
    
        private ByteBufferInputStream createSingleBufferInputStream(byte[] 
data) {
            ByteBuffer buffer = 
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
            return ByteBufferInputStream.wrap(buffer);
        }
    
        private ByteBuffer createDirectBuffer(byte[] data) {
            ByteBuffer buffer = 
ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
            buffer.put(data);
            buffer.flip();
            return buffer;
        }
    
        private ByteBufferInputStream 
createDirectSingleBufferInputStream(byte[] data) {
            ByteBuffer buffer = createDirectBuffer(data);
            return ByteBufferInputStream.wrap(buffer);
        }
    
        // 
====================================================================================
        // readShorts OnHeap
        // 
====================================================================================
    
        Benchmark
        public void readShorts_onHeap_New(OnHeapColumnVectorState state) throws 
IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                newSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE, 
numValues - i), state.shortColumn, 0);
            }
        }
    
        Benchmark
        public void readShorts_onHeap_Old(OnHeapColumnVectorState state) throws 
IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                oldSingleBufferOnHeapReader.readShorts(Math.min(BATCH_SIZE, 
numValues - i), state.shortColumn, 0);
            }
        }
    
        // 
====================================================================================
        // readShorts offHeap
        // 
====================================================================================
    
        Benchmark
        public void readShorts_offHeap_New(OnHeapColumnVectorState state) 
throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                newSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE, 
numValues - i), state.shortColumn, 0);
            }
        }
    
        Benchmark
        public void readShorts_offHeap_Old(OnHeapColumnVectorState state) 
throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                oldSingleBufferOffHeapReader.readShorts(Math.min(BATCH_SIZE, 
numValues - i), state.shortColumn, 0);
            }
        }
    
        // ==================== Main Method ====================
    
        public static void main(String[] args) throws RunnerException {
            String filter = args.length > 0 ? args[0] : 
VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
            Options opt = new OptionsBuilder()
                    .include(filter)
                    .build();
    
            new Runner(opt).run();
        }
    }
    
    ```
    </details>
    
    Perform `build/sbt "sql/Test/runMain 
org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark"`
 to conduct the test
    
    **Benchmark results:**
    
    - Java 17.0.18+8-LTS
    
    ```
    Benchmark                                                       (numValues) 
 Mode  Cnt     Score    Error  Units
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New     10000000 
 avgt   10  4048.579 ± 54.466  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old     10000000 
 avgt   10  3952.443 ± 29.947  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New      10000000 
 avgt   10  4358.785 ± 45.051  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old      10000000 
 avgt   10  6775.679 ± 75.302  us/op
    ```
    
    - Java 21.0.10+7-LTS
    
    ```
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_New     10000000 
 avgt   10  3050.606 ± 57.169  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_offHeap_Old     10000000 
 avgt   10  7206.623 ± 29.275  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_New      10000000 
 avgt   10  3252.563 ± 44.564  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readShorts_onHeap_Old      10000000 
 avgt   10  7145.537 ±  8.843  us/op
    ```
    
    The test results reveal that the optimized OnHeap path achieves nearly 50%+ 
performance improvement. The OffHeap path shows no significant negative impact.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    The benchmark code used for performance testing was generated by GitHub 
Copilot.
    
    Closes #54441 from LuciferYang/readShorts.
    
    Lead-authored-by: yangjie01 <[email protected]>
    Co-authored-by: YangJie <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../datasources/parquet/VectorizedPlainValuesReader.java |  9 +++++++--
 .../sql/execution/vectorized/OffHeapColumnVector.java    | 16 ++++++++++++++++
 .../sql/execution/vectorized/OnHeapColumnVector.java     | 14 ++++++++++++++
 .../sql/execution/vectorized/WritableColumnVector.java   |  7 +++++++
 .../sql/execution/vectorized/ColumnarBatchSuite.scala    | 11 +++++++++++
 5 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 7364fa5536c0..a040a8990bad 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -315,8 +315,13 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     int requiredBytes = total * 4;
     ByteBuffer buffer = getBuffer(requiredBytes);
 
-    for (int i = 0; i < total; i += 1) {
-      c.putShort(rowId + i, (short) buffer.getInt());
+    if (buffer.hasArray()) {
+      int offset = buffer.arrayOffset() + buffer.position();
+      c.putShortsFromIntsLittleEndian(rowId, total, buffer.array(), offset);
+    } else {
+      for (int i = 0; i < total; i += 1) {
+        c.putShort(rowId + i, (short) buffer.getInt());
+      }
     }
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 42454b283d09..b56a49d8ee40 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -267,6 +267,22 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
       null, data + rowId * 2L, count * 2L);
   }
 
+  @Override
+  public void putShortsFromIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex) {
+    int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+    long dstOffset = data + rowId * 2L;
+    if (bigEndianPlatform) {
+      for (int i = 0; i < count; ++i, srcOffset += 4, dstOffset += 2) {
+        Platform.putShort(null, dstOffset,
+          (short) Integer.reverseBytes(Platform.getInt(src, srcOffset)));
+      }
+    } else {
+      for (int i = 0; i < count; ++i, srcOffset += 4, dstOffset += 2) {
+        Platform.putShort(null, dstOffset, (short) Platform.getInt(src, 
srcOffset));
+      }
+    }
+  }
+
   @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 401e499fee30..0854c42db672 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -264,6 +264,20 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
       Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
   }
 
+  @Override
+  public void putShortsFromIntsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex) {
+    int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+    if (bigEndianPlatform) {
+      for (int i = 0; i < count; ++i, srcOffset += 4) {
+        shortData[rowId + i] = (short) 
Integer.reverseBytes(Platform.getInt(src, srcOffset));
+      }
+    } else {
+      for (int i = 0; i < count; ++i, srcOffset += 4) {
+        shortData[rowId + i] = (short) Platform.getInt(src, srcOffset);
+      }
+    }
+  }
+
   @Override
   public short getShort(int rowId) {
     if (dictionary == null) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 904c48309778..7ce4139df47b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -281,6 +281,13 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    */
   public abstract void putShorts(int rowId, int count, short[] src, int 
srcIndex);
 
+  /**
+   * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, 
rowId + count)
+   * Each 4-byte little endian int is truncated to a short.
+   */
+  public abstract void putShortsFromIntsLittleEndian(
+      int rowId, int count, byte[] src, int srcIndex);
+
   /**
    * Sets values from [src[srcIndex], src[srcIndex + count * 2]) to [rowId, 
rowId + count)
    * The data in src must be 2-byte platform native endian shorts.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 6d90bb985e26..0f2ca93f287c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -323,6 +323,17 @@ class ColumnarBatchSuite extends SparkFunSuite {
       reference += 4
       idx += 3
 
+      val intSrc = Array(0, 1, 32767, -32768, 65535, -1, 12345, -12345)
+      val count = intSrc.length
+      val byteBuffer = ByteBuffer.allocate(count * 
4).order(ByteOrder.LITTLE_ENDIAN)
+      intSrc.foreach(byteBuffer.putInt)
+      val byteArray = byteBuffer.array()
+      column.putShortsFromIntsLittleEndian(idx, count, byteArray, 0)
+      (0 until count).foreach { i =>
+        reference += intSrc(i).toShort
+      }
+      idx += count
+
       while (idx < column.capacity) {
         val single = random.nextBoolean()
         if (single) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to