This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f9a9c9402c92 [SPARK-55683][SQL] Optimize
`VectorizedPlainValuesReader.readUnsignedLongs`
f9a9c9402c92 is described below
commit f9a9c9402c929478ba23c1a000d6dbbabbed7b13
Author: yangjie01 <[email protected]>
AuthorDate: Wed Feb 25 09:14:19 2026 -0800
[SPARK-55683][SQL] Optimize `VectorizedPlainValuesReader.readUnsignedLongs`
### What changes were proposed in this pull request?
This PR optimizes `VectorizedPlainValuesReader.readUnsignedLongs` by
replacing the per-element `BigInteger` heap allocation chain with direct byte
manipulation.
The original implementation allocates multiple objects per element:
```java
// Old: String + BigInteger + internal int[] + byte[] allocations per
element
c.putByteArray(rowId + i,
new BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());
```
The new implementation reads raw little-endian bytes directly from the
`ByteBuffer` backing array (when available) and converts them to
`BigInteger`-compatible big-endian encoding in a single pass:
```java
// New: hasArray() fast path - operates directly on backing array, one
byte[] per element
if (buffer.hasArray()) {
byte[] src = buffer.array();
int offset = buffer.arrayOffset() + buffer.position();
for (int i = 0; i < total; i++, rowId++, offset += 8) {
putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
}
} else {
byte[] data = new byte[8]; // reused across all values in this batch
for (int i = 0; i < total; i++, rowId++) {
buffer.get(data);
putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
}
}
```
The private helper `putLittleEndianBytesAsBigInteger` handles the
conversion with output matching `BigInteger.toByteArray()` semantics:
- **Zero value**: writes `[0x00]` (1 byte) rather than an empty array,
since `new BigInteger(new byte[0])` throws `NumberFormatException`
- **Sign byte**: prepends `0x00` when the most significant byte has bit 7
set, to ensure the value is interpreted as positive by `BigInteger`
- **Byte order**: reverses little-endian Parquet physical encoding to
big-endian in a single loop
### Why are the changes needed?
The original implementation constructs a `BigInteger` via
`Long.toUnsignedString` + `new BigInteger(String)`, which involves per-element
allocations of a `String`, a `BigInteger`, its internal `int[]` magnitude
array, and the final `byte[]`. For a typical batch of 4096 values this means
~16K object allocations, creating significant GC pressure in workloads reading
large `UINT_64` columns.
The new implementation reduces this to one `byte[]` allocation per element
by operating directly on the raw bytes from the `ByteBuffer`, avoiding all
intermediate object creation. Additionally, the direct buffer fallback path
reuses a single `byte[8]` scratch buffer across the entire batch.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- The existing test `SPARK-34817: Read UINT_64 as Decimal from parquet` in
`ParquetIOSuite` was extended with boundary values covering the critical edge
cases of the new byte manipulation logic
- Rename the original code to `OldVectorizedPlainValuesReader`, and compare
the latency of the old and new `readUnsignedLongs` methods using JMH:
<details>
<summary><b>Benchmark Code (click to expand)</b></summary>
```java
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
BenchmarkMode(Mode.AverageTime)
OutputTimeUnit(TimeUnit.MICROSECONDS)
State(Scope.Thread)
Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
Warmup(iterations = 5, time = 1)
Measurement(iterations = 10, time = 1)
public class VectorizedPlainValuesReaderJMHBenchmark {
// ==================== Parameters ====================
Param({"10000000"})
private int numValues;
// ==================== Test Data ====================
private byte[] longData;
private static final int BATCH_SIZE = 4096;
private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
private VectorizedPlainValuesReader newSingleBufferOffHeapReader;
// ==================== State Classes ====================
/**
* Column vector state using DecimalType(20, 0), which is the correct
type for UINT64.
* Parquet UINT_64 logical type is mapped to DecimalType(20, 0) in
Spark.
* Using LongType would cause NullPointerException because
readUnsignedLongs
* calls arrayData() which requires childColumns, only initialized for
DecimalType.
*/
State(Scope.Thread)
public static class DecimalColumnVectorState {
public WritableColumnVector decimalColumn;
Setup(Level.Iteration)
public void setup() {
// UINT64 -> DecimalType(20, 0): precision=20, scale=0
decimalColumn = new OnHeapColumnVector(BATCH_SIZE,
DataTypes.createDecimalType(20, 0));
}
TearDown(Level.Iteration)
public void tearDown() {
decimalColumn.close();
}
Setup(Level.Invocation)
public void reset() {
decimalColumn.reset();
}
}
// ==================== Setup ====================
Setup(Level.Trial)
public void setupTrial() {
Random random = new Random(42);
longData = generateLongData(numValues, random);
}
Setup(Level.Invocation)
public void setupInvocation() throws IOException {
oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
oldSingleBufferOnHeapReader.initFromPage(numValues,
createSingleBufferInputStream(longData));
oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
oldSingleBufferOffHeapReader.initFromPage(numValues,
createDirectSingleBufferInputStream(longData));
newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
newSingleBufferOnHeapReader.initFromPage(numValues,
createSingleBufferInputStream(longData));
newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
newSingleBufferOffHeapReader.initFromPage(numValues,
createDirectSingleBufferInputStream(longData));
}
// ==================== Data Generation ====================
private byte[] generateLongData(int count, Random random) {
ByteBuffer buffer = ByteBuffer.allocate(count *
8).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; i++) {
buffer.putLong(random.nextLong()); // full unsigned long range
}
return buffer.array();
}
// ==================== ByteBufferInputStream Creation
====================
private ByteBufferInputStream createSingleBufferInputStream(byte[]
data) {
ByteBuffer buffer =
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
return ByteBufferInputStream.wrap(buffer);
}
private ByteBuffer createDirectBuffer(byte[] data) {
ByteBuffer buffer =
ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
buffer.put(data);
buffer.flip();
return buffer;
}
private ByteBufferInputStream
createDirectSingleBufferInputStream(byte[] data) {
ByteBuffer buffer = createDirectBuffer(data);
return ByteBufferInputStream.wrap(buffer);
}
//
====================================================================================
// readUnsignedLongs onHeap
//
====================================================================================
Benchmark
public void readUnsignedLongs_onHeap_Old(DecimalColumnVectorState
state) throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
oldSingleBufferOnHeapReader.readUnsignedLongs(
Math.min(BATCH_SIZE, numValues - i),
state.decimalColumn, 0);
}
}
Benchmark
public void readUnsignedLongs_onHeap_New(DecimalColumnVectorState
state) throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
newSingleBufferOnHeapReader.readUnsignedLongs(
Math.min(BATCH_SIZE, numValues - i),
state.decimalColumn, 0);
}
}
//
====================================================================================
// readUnsignedLongs offHeap
//
====================================================================================
Benchmark
public void readUnsignedLongs_offHeap_Old(DecimalColumnVectorState
state) throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
oldSingleBufferOffHeapReader.readUnsignedLongs(
Math.min(BATCH_SIZE, numValues - i),
state.decimalColumn, 0);
}
}
Benchmark
public void readUnsignedLongs_offHeap_New(DecimalColumnVectorState
state) throws IOException {
for (int i = 0; i < numValues; i += BATCH_SIZE) {
newSingleBufferOffHeapReader.readUnsignedLongs(
Math.min(BATCH_SIZE, numValues - i),
state.decimalColumn, 0);
}
}
// ==================== Main Method ====================
public static void main(String[] args) throws RunnerException {
String filter = args.length > 0 ?
args[0] :
VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
Options opt = new OptionsBuilder()
.include(filter)
.build();
new Runner(opt).run();
}
}
```
</details>
Perform `build/sbt "sql/Test/runMain
org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark"`
to conduct the test
**Benchmark results:**
- Java 17.0.18+8-LTS
```
Benchmark
(numValues) Mode Cnt Score Error Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New
10000000 avgt 10 249413.824 ± 12242.331 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old
10000000 avgt 10 2301279.127 ± 14970.249 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New
10000000 avgt 10 282651.747 ± 5031.717 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old
10000000 avgt 10 2382690.093 ± 10364.228 us/op
```
- Java 21.0.10+7-LTS
```
Benchmark
(numValues) Mode Cnt Score Error Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New
10000000 avgt 10 256621.630 ± 24087.509 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old
10000000 avgt 10 2120170.591 ± 4862.317 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New
10000000 avgt 10 284058.229 ± 19966.179 us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old
10000000 avgt 10 2190838.305 ± 7979.740 us/op
```
Both onHeap and offHeap paths show approximately **~8x** improvement.
### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.
Closes #54479 from LuciferYang/SPARK-55683.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../parquet/VectorizedPlainValuesReader.java | 81 ++++++++++++++++++++--
.../datasources/parquet/ParquetIOSuite.scala | 18 +++--
2 files changed, 91 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index a040a8990bad..9ad07a1ff5ee 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
-import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -199,10 +198,84 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
public final void readUnsignedLongs(int total, WritableColumnVector c, int
rowId) {
int requiredBytes = total * 8;
ByteBuffer buffer = getBuffer(requiredBytes);
- for (int i = 0; i < total; i += 1) {
- c.putByteArray(
- rowId + i, new
BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());
+ if (buffer.hasArray()) {
+ byte[] src = buffer.array();
+ int offset = buffer.arrayOffset() + buffer.position();
+ for (int i = 0; i < total; i++, rowId++, offset += 8) {
+ putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
+ }
+ } else {
+ // direct buffer fallback: copy 8 bytes per value
+ byte[] data = new byte[8];
+ for (int i = 0; i < total; i++, rowId++) {
+ buffer.get(data);
+ putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
+ }
+ }
+ }
+
+ /**
+ * Writes 8 little-endian bytes from {@code src[offset..offset+7]} into
{@code c} at
+ * {@code rowId} as a big-endian byte array compatible with {@link
java.math.BigInteger}
+ * two's-complement encoding.
+ *
+ * <p>The output matches the semantics of {@link
java.math.BigInteger#toByteArray()}:
+ * <ul>
+ * <li>Big-endian byte order</li>
+ * <li>Minimal encoding: no unnecessary leading zero bytes</li>
+ * <li>A {@code 0x00} sign byte is prepended if the most significant byte
has bit 7
+ * set, ensuring the value is interpreted as positive by {@code
BigInteger}</li>
+ * <li>Zero is encoded as {@code [0x00]} (1 byte), not an empty array,
because
+ * {@code new BigInteger(new byte[0])} throws {@link
NumberFormatException}</li>
+ * </ul>
+ *
+ * <p>This is used by {@link #readUnsignedLongs} to store Parquet {@code
UINT_64} values
+ * into a {@code DecimalType(20, 0)} column vector, where each value is
stored as a
+ * byte array in {@code arrayData()} and later reconstructed via
+ * {@code new BigInteger(bytes)}.
+ *
+ * @param c the target column vector; must be of {@code DecimalType(20,
0)}
+ * @param rowId the row index to write into
+ * @param src the source byte array containing little-endian encoded data
+ * @param offset the starting position in {@code src}; reads bytes
+ * {@code src[offset..offset+7]}
+ */
+ private static void putLittleEndianBytesAsBigInteger(
+ WritableColumnVector c, int rowId, byte[] src, int offset) {
+ // src is little-endian; the most significant byte is at src[offset + 7].
+ // Scan from the most significant end to find the first non-zero byte,
+ // which determines the minimal number of bytes needed for encoding.
+ int msbIndex = offset + 7;
+ while (msbIndex > offset && src[msbIndex] == 0) {
+ msbIndex--;
+ }
+
+ // Zero value: must write [0x00] rather than an empty array.
+ // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new
byte[0])
+ // throws NumberFormatException("Zero length BigInteger").
+ if (msbIndex == offset && src[offset] == 0) {
+ c.putByteArray(rowId, new byte[]{0});
+ return;
+ }
+
+ // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
+ // This matches BigInteger.toByteArray() behavior: positive values whose
highest
+ // magnitude byte has the MSB set are prefixed with 0x00 to distinguish
them
+ // from negative values in two's-complement encoding.
+ boolean needSignByte = (src[msbIndex] & 0x80) != 0;
+ int valueLen = msbIndex - offset + 1;
+ int totalLen = needSignByte ? valueLen + 1 : valueLen;
+
+ byte[] dest = new byte[totalLen];
+ int destOffset = 0;
+ if (needSignByte) {
+ dest[destOffset++] = 0x00;
+ }
+ // Reverse byte order: little-endian src → big-endian dest
+ for (int i = msbIndex; i >= offset; i--) {
+ dest[destOffset++] = src[i];
}
+ c.putByteArray(rowId, dest, 0, totalLen);
}
// A fork of `readLongs` to rebase the timestamp values. For performance
reasons, this method
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3072657a0954..b52a464c5bd2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.math.{BigDecimal => JBigDecimal}
import java.time.{LocalDateTime, LocalTime}
import java.util.Locale
@@ -1261,11 +1262,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val writer = createParquetWriter(schema, path, dictionaryEnabled)
val factory = new SimpleGroupFactory(schema)
+ // Original range retained to avoid regression
(-500 until 500).foreach { i =>
val group = factory.newGroup()
.append("a", i % 100L)
writer.write(group)
}
+ // Boundary values: zero, one, signed extremes interpreted as unsigned
+ Seq(0L, 1L, Long.MaxValue, Long.MinValue, -2L, -1L).foreach { v =>
+ val group = factory.newGroup().append("a", v)
+ writer.write(group)
+ }
writer.close()
}
@@ -1273,10 +1280,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
readParquetFile(path.toString) { df =>
- checkAnswer(df, (-500 until 500).map { i =>
- val bi = UnsignedLong.fromLongBits(i % 100L).bigIntegerValue()
- Row(new java.math.BigDecimal(bi))
- })
+ val originalExpected = (-500 until 500).map { i =>
+ Row(new JBigDecimal(UnsignedLong.fromLongBits(i %
100L).bigIntegerValue()))
+ }
+ val boundaryExpected = Seq(0L, 1L, Long.MaxValue, Long.MinValue,
-2L, -1L).map { v =>
+ Row(new
JBigDecimal(UnsignedLong.fromLongBits(v).bigIntegerValue()))
+ }
+ checkAnswer(df, originalExpected ++ boundaryExpected)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]