This is an automated email from the ASF dual-hosted git repository.
LuciferYang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 59471088cc87 [SPARK-56802][SQL] Add bulk read+widen path for FLOAT to
Double Parquet vector updater
59471088cc87 is described below
commit 59471088cc87b5b83337f5b470219ab22cc4b7c4
Author: YangJie <[email protected]>
AuthorDate: Wed May 13 21:54:08 2026 +0800
[SPARK-56802][SQL] Add bulk read+widen path for FLOAT to Double Parquet
vector updater
### What changes were proposed in this pull request?
Extend the bulk read+widen pattern introduced in SPARK-56791 to
`FloatToDoubleUpdater` (parquet FLOAT read into Spark `DoubleType`).
A new `readFloatsAsDoubles` default method on `VectorizedValuesReader` does
the per-row fallback. `VectorizedPlainValuesReader` overrides it to fetch
source bytes once via `getBuffer(total * 4)` and run a tight in-method
conversion loop. `FloatToDoubleUpdater.readValues` becomes a one-line
delegation. The widen is Java's primitive float-to-double conversion: exact for
every finite and infinite float; a NaN float widens to a double NaN (the JVM
may canonicalize the payload).
### Why are the changes needed?
`FloatToDoubleUpdater.readValues` allocates a fresh `ByteBuffer` slice
inside `getBuffer(4)` for every element on the legacy path, and that allocation
dominates the loop. Collapsing N allocations into one is the same win
SPARK-56791 delivered for the INT32 -> Long sibling, with the gain again
amplifying on newer JDKs where escape analysis better optimizes the tight loop.
Baselines and after-numbers are taken directly from the benchmark-result diffs
committed on this branch:
| JDK | Baseline | After | Speedup |
|----:|---------:|----------:|--------:|
| 17 | 480.2 M/s | 1418.8 M/s | 2.95x |
| 21 | 489.5 M/s | 2527.2 M/s | 5.16x |
| 25 | 483.3 M/s | 2570.7 M/s | 5.32x |
Peer Updaters in the same benchmark group hold within run-to-run noise,
confirming the change is local to `FloatToDoubleUpdater`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths
(0, 1, 7, 8, 9, 17, 1024, 4097), the singular `readValue` path, and special
values (signed zeros, +/-Infinity, NaN, with raw-bit assertions on signed
zeros). A NaN-safe `assertDoublesEqual` helper uses `java.lang.Double.compare`
so NaN equality and signed-zero distinction are well-defined.
A new end-to-end test in `ParquetIOSuite` round-trips FLOAT written to
parquet and read back as `DoubleType`, exercising both REQUIRED columns (no
def-levels) and OPTIONAL columns with interleaved nulls so that `readValue` and
`readValues` are both invoked.
Benchmark results on JDK 17, 21, and 25 are committed on the branch.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #55816 from LuciferYang/SPARK-56802-float-to-double.
Authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit bc4bf69f5eefdee84e18e022ace8a9427e4d3b3a)
Signed-off-by: yangjie01 <[email protected]>
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 +++++-----
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 46 +++++-----
.../ParquetVectorUpdaterBenchmark-results.txt | 46 +++++-----
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 12 +++
.../parquet/VectorizedValuesReader.java | 21 +++++
.../datasources/parquet/ParquetIOSuite.scala | 49 +++++++++++
.../parquet/ParquetVectorUpdaterSuite.scala | 99 ++++++++++++++++++++++
8 files changed, 251 insertions(+), 72 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index a4c48eff31a6..7d09c6fd187e 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 16954.6 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3764.8 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1677.4 0.6 0.1X
-IntegerUpdater 0 0
0 10269.9 0.1 0.6X
-LongUpdater 0 0
0 5120.9 0.2 0.3X
-FloatUpdater 0 0
0 10252.8 0.1 0.6X
-DoubleUpdater 0 0
0 5133.4 0.2 0.3X
-BinaryUpdater 15 15
3 71.0 14.1 0.0X
+BooleanUpdater 0 0
0 17004.4 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3758.6 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1678.4 0.6 0.1X
+IntegerUpdater 0 0
0 10276.1 0.1 0.6X
+LongUpdater 0 0
0 5112.7 0.2 0.3X
+FloatUpdater 0 0
0 10293.3 0.1 0.6X
+DoubleUpdater 0 0
0 3870.8 0.3 0.2X
+BinaryUpdater 15 15
1 71.3 14.0 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6220.9 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6215.0 0.2 1.0X
-FloatToDoubleUpdater 2 2
0 489.5 2.0 0.1X
-DateToTimestampNTZUpdater 29 29
1 36.1 27.7 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.2 2.2 0.1X
+IntegerToLongUpdater 0 0
0 5145.1 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 5120.5 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 2527.2 0.4 0.5X
+DateToTimestampNTZUpdater 29 29
0 36.3 27.5 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.6 2.2 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3645.1 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.5 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 419.9 2.4 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3647.4 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2668.6 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.3 2.4 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5846.6 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
0 60.3 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5089.6 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.2 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10296.2 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5145.8 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
1 50.2 19.9 0.0X
+IntegerToDecimalUpdater 0 0
0 7746.5 0.1 1.0X
+LongToDecimalUpdater 0 0
0 3874.1 0.3 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 1 52.7 19.0 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
20 2 54.1 18.5 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 132.7 7.5 2.5X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.5X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index a22613b7dd4f..627835735a08 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17171.8 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3675.3 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1661.2 0.6 0.1X
-IntegerUpdater 0 0
0 10301.4 0.1 0.6X
-LongUpdater 0 0
0 5147.9 0.2 0.3X
-FloatUpdater 0 0
0 10294.3 0.1 0.6X
-DoubleUpdater 0 0
0 5113.4 0.2 0.3X
-BinaryUpdater 16 16
1 66.9 14.9 0.0X
+BooleanUpdater 0 0
0 17138.1 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3678.2 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1662.8 0.6 0.1X
+IntegerUpdater 0 0
0 10231.8 0.1 0.6X
+LongUpdater 0 0
0 5103.2 0.2 0.3X
+FloatUpdater 0 0
0 10203.9 0.1 0.6X
+DoubleUpdater 0 0
0 5105.4 0.2 0.3X
+BinaryUpdater 17 17
0 62.8 15.9 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6351.2 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6423.7 0.2 1.0X
-FloatToDoubleUpdater 2 2
0 483.3 2.1 0.1X
-DateToTimestampNTZUpdater 29 29
1 36.6 27.3 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.4 2.2 0.1X
+IntegerToLongUpdater 0 0
0 6239.9 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6382.6 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 2570.7 0.4 0.4X
+DateToTimestampNTZUpdater 26 26
0 40.5 24.7 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 637.7 1.6 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3653.2 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2659.6 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.1 2.7 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3660.5 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.6 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 521.3 1.9 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6324.8 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17
0 60.4 16.5 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6196.3 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.3 16.6 0.0X
================================================================================================
@@ -64,8 +64,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10258.9 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5144.1 0.2 0.5X
+IntegerToDecimalUpdater 0 0
0 10247.8 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5125.7 0.2 0.5X
FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 21
22 1 50.2 19.9 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.5 6.6 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.5 7.8 2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 51.0 19.6 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.6 6.6 3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.5X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index fe4f716d2e15..5e2c31d2b566 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 20526.1 0.0 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3669.4 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 2050.4 0.5 0.1X
-IntegerUpdater 0 0
0 10107.4 0.1 0.5X
-LongUpdater 0 0
0 5037.7 0.2 0.2X
-FloatUpdater 0 0
0 10239.0 0.1 0.5X
-DoubleUpdater 0 0
0 5095.8 0.2 0.2X
-BinaryUpdater 15 15
0 70.1 14.3 0.0X
+BooleanUpdater 0 0
0 14526.2 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3679.3 0.3 0.3X
+ShortUpdater (INT32 -> Short) 1 1
0 2054.1 0.5 0.1X
+IntegerUpdater 0 0
0 10178.0 0.1 0.7X
+LongUpdater 0 0
0 5054.4 0.2 0.3X
+FloatUpdater 0 0
0 10212.8 0.1 0.7X
+DoubleUpdater 0 0
0 5051.2 0.2 0.3X
+BinaryUpdater 15 15
0 68.4 14.6 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 1 1
0 1277.6 0.8 1.0X
-IntegerToDoubleUpdater 1 1
0 1475.2 0.7 1.2X
-FloatToDoubleUpdater 2 2
0 480.2 2.1 0.4X
-DateToTimestampNTZUpdater 36 36
0 29.0 34.5 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.0 2.2 0.4X
+IntegerToLongUpdater 1 1
0 1280.6 0.8 1.0X
+IntegerToDoubleUpdater 1 1
0 1537.9 0.7 1.2X
+FloatToDoubleUpdater 1 1
0 1418.8 0.7 1.1X
+DateToTimestampNTZUpdater 36 36
0 29.5 33.9 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.3 2.2 0.4X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2400.9 0.4 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2079.6 0.5 0.9X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.0 2.2 0.2X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2407.3 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2030.8 0.5 0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.4 2.2 0.2X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1091.4 0.9 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.0 16.9 0.1X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1093.1 0.9 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.1 16.9 0.1X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10204.0 0.1 1.0X
-LongToDecimalUpdater 0 0
0 4883.9 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
1 51.0 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10195.9 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5049.2 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 51.0 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 55.2 18.1 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 54.9 18.2 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.1 8.1 2.2X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.0 8.1 2.2X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 46a0d3dd212f..eba4b426a0d8 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -949,9 +949,7 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- for (int i = 0; i < total; ++i) {
- values.putDouble(offset + i, valuesReader.readFloat());
- }
+ valuesReader.readFloatsAsDoubles(total, values, offset);
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 0329b1ff8ebf..d91ba5e2b87d 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -180,6 +180,18 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
+ @Override
+ public final void readFloatsAsDoubles(int total, WritableColumnVector c, int
rowId) {
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+ // No `hasArray` bulk-copy path: source (float, 4 bytes) and target
(double, 8 bytes)
+ // have different widths so a contiguous byte copy is impossible. Matches
the pattern
+ // in `readIntegersAsLongs`.
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, buffer.getFloat());
+ }
+ }
+
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 8cc9a7dae2cd..a90e9bf01c81 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -94,6 +94,27 @@ public interface VectorizedValuesReader {
}
}
+ /**
+ * Reads {@code total} FLOAT values, widens each to a double, and writes
them into
+ * {@code c} starting at {@code c[rowId]}. The widening is Java's primitive
+ * float-to-double conversion: exact for every finite and infinite float; a
NaN
+ * float widens to a double NaN (the payload may be canonicalized by the
JVM).
+ * Used by the type-converting updater that reads parquet FLOAT columns into
+ * Spark {@code DoubleType} targets.
+ *
+ * <p>The default implementation falls back to a per-row read+widen+write
loop and is
+ * therefore equivalent in cost to the legacy per-row Updater path.
Subclasses backed
+ * by contiguous bulk storage (e.g. PLAIN encoding via {@link
VectorizedPlainValuesReader})
+ * should override to read source bytes once and run a tight in-method
conversion loop,
+ * avoiding {@code total} virtual dispatches on {@link #readFloat()}.
Readers without
+ * an override preserve correctness but gain no speedup.
+ */
+ default void readFloatsAsDoubles(int total, WritableColumnVector c, int
rowId) {
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, readFloat());
+ }
+ }
+
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index bab3d4e36f5e..692306fb52f2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1972,6 +1972,55 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
}
+
+ test("FLOAT -> Double widening end-to-end via vectorized read path") {
+ // Round-trips a FLOAT Parquet file read back as DoubleType, exercising
+ // FloatToDoubleUpdater. Same REQUIRED/OPTIONAL coverage as the INT32 ->
Long
+ // sibling test above. The widening is Java's primitive float-to-double
conversion;
+ // `checkAnswer`'s Double comparison treats any NaN as equal to any NaN,
so NaN
+ // payload preservation is not asserted at this level.
+ withTempPath { file =>
+ val n = 5000
+ def sampleAt(i: Int): Float = i % 7 match {
+ case 0 => Float.MinValue
+ case 1 => -1.5f
+ case 2 => -0.0f
+ case 3 => 0.0f
+ case 4 => Float.MaxValue
+ case 5 => Float.NaN
+ case _ => i * 0.125f - 3.25f
+ }
+
+ val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+ // Every 11th row is null. Mixed with NaN entries above, this exercises
the PACKED
+ // def-level path interleaving runs of values with runs of nulls.
+ val nullableData = (0 until n).map { i =>
+ if (i % 11 == 0) Row(null) else Row(sampleAt(i))
+ }
+
+ val nonNullWriteSchema = new StructType().add("v", FloatType, nullable =
false)
+ val nonNullReadSchema = new StructType().add("v", DoubleType, nullable =
false)
+ val nullableWriteSchema = new StructType().add("v", FloatType, nullable
= true)
+ val nullableReadSchema = new StructType().add("v", DoubleType, nullable
= true)
+
+ val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+ val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+ spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4),
nonNullWriteSchema)
+ .write.parquet(nonNullPath)
+ spark.createDataFrame(spark.sparkContext.parallelize(nullableData, 4),
nullableWriteSchema)
+ .write.parquet(nullablePath)
+
+ val expectedNonNull = nonNullData.map(r => Row(r.getFloat(0).toDouble))
+ val expectedNullable = nullableData.map { r =>
+ if (r.isNullAt(0)) Row(null) else Row(r.getFloat(0).toDouble)
+ }
+
+ withAllParquetReaders {
+ checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath),
expectedNonNull)
+
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath),
expectedNullable)
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context:
TaskAttemptContext)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 64a9b2032b65..2ed6f84e68b9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types._
* whose `readValues` is a bulk delegation to `VectorizedValuesReader`:
* - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch)
* - `IntegerToDoubleUpdater` (INT32 -> Double)
+ * - `FloatToDoubleUpdater` (FLOAT -> Double)
*
* Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
* `readValue` path, and the factory's long-decimal dispatch
@@ -71,6 +72,13 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
buf.array()
}
+ private def plainFloatBytes(values: Array[Float]): Array[Byte] = {
+ val buf = ByteBuffer.allocate(values.length *
4).order(ByteOrder.LITTLE_ENDIAN)
+ var i = 0
+ while (i < values.length) { buf.putFloat(values(i)); i += 1 }
+ buf.array()
+ }
+
private def newPlainReader(bytes: Array[Byte], numValues: Int):
VectorizedPlainValuesReader = {
val r = new VectorizedPlainValuesReader
r.initFromPage(numValues,
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
@@ -211,4 +219,95 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
assert(actual === input.map(_.toDouble))
}
+ // ---- FloatToDoubleUpdater: same bulk-path shape, source is FLOAT, target
is DoubleType ----
+
+ // FLOAT column descriptor with no logical-type annotation.
+ private val floatDescriptor: ColumnDescriptor = {
+ val pt = Types.primitive(PrimitiveTypeName.FLOAT,
Repetition.OPTIONAL).named("col")
+ new ColumnDescriptor(Array("col"), pt, 0, 1)
+ }
+
+ // Reads `values.length` FLOATs through `FloatToDoubleUpdater.readValues`
and returns the
+ // resulting double column.
+ private def readViaFloatToDoubleUpdater(values: Array[Float]): Array[Double]
= {
+ val fac = newFactory(floatDescriptor)
+ val updater = fac.getUpdater(floatDescriptor, DataTypes.DoubleType)
+ val out = new OnHeapColumnVector(values.length.max(1),
DataTypes.DoubleType)
+ val reader = newPlainReader(plainFloatBytes(values), values.length)
+ updater.readValues(values.length, 0, out, reader)
+ val result = new Array[Double](values.length)
+ var i = 0
+ while (i < values.length) { result(i) = out.getDouble(i); i += 1 }
+ result
+ }
+
+ // Sample mixes finite, signed-zero, NaN, and infinity to catch sign and
special-value bugs.
+ private def floatSampleValues(n: Int): Array[Float] = {
+ val out = new Array[Float](n)
+ var i = 0
+ while (i < n) {
+ out(i) = i % 7 match {
+ case 0 => Float.MinValue
+ case 1 => -1.5f
+ case 2 => -0.0f
+ case 3 => 0.0f
+ case 4 => Float.MaxValue
+ case 5 => Float.NaN
+ case _ => i * 0.125f - 3.25f
+ }
+ i += 1
+ }
+ out
+ }
+
+ // Java's float-to-double widening is exact for finite/infinite values and
produces
+ // some double NaN for a NaN input (the payload may be canonicalized). Use
+ // `java.lang.Double.compare` to give NaN well-defined equality and to
distinguish
+ // -0.0 from +0.0.
+ private def assertDoublesEqual(actual: Array[Double], expected:
Array[Double]): Unit = {
+ assert(actual.length === expected.length)
+ var i = 0
+ while (i < actual.length) {
+ assert(java.lang.Double.compare(actual(i), expected(i)) === 0,
+ s"mismatch at $i: actual=${actual(i)} expected=${expected(i)}")
+ i += 1
+ }
+ }
+
+ for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+ test(s"FloatToDoubleUpdater produces correct widened output (total=$n)") {
+ val input = floatSampleValues(n)
+ assertDoublesEqual(readViaFloatToDoubleUpdater(input),
input.map(_.toDouble))
+ }
+ }
+
+ test("FloatToDoubleUpdater: readValue widens a single FLOAT -> Double") {
+ // Same rationale as the IntegerToLongUpdater readValue test: the
def-level-decoder's
+ // run-of-1 path calls `readFloat()` directly rather than the bulk method.
+ val input = Array(0.0f, 1.5f, -1.5f, Float.MinValue, Float.MaxValue,
Float.NaN)
+ val fac = newFactory(floatDescriptor)
+ val updater = fac.getUpdater(floatDescriptor, DataTypes.DoubleType)
+ val out = new OnHeapColumnVector(input.length, DataTypes.DoubleType)
+ val reader = newPlainReader(plainFloatBytes(input), input.length)
+ var i = 0
+ while (i < input.length) {
+ updater.readValue(i, out, reader)
+ i += 1
+ }
+ val actual = (0 until input.length).map(out.getDouble).toArray
+ assertDoublesEqual(actual, input.map(_.toDouble))
+ }
+
+ test("FloatToDoubleUpdater: special values (signed zeros, NaN, +/-Infinity)
widen exactly") {
+ // -0.0f widens to -0.0d (distinct from +0.0d), and Float.NaN widens to a
double NaN.
+ val input = Array(-0.0f, 0.0f, Float.NegativeInfinity,
Float.PositiveInfinity, Float.NaN)
+ val actual = readViaFloatToDoubleUpdater(input)
+ val expected = input.map(_.toDouble)
+ assertDoublesEqual(actual, expected)
+ // Spot-check signed-zero preservation directly (===-based comparison
would conflate).
+ assert(java.lang.Double.doubleToRawLongBits(actual(0)) ===
+ java.lang.Double.doubleToRawLongBits(-0.0d))
+ assert(java.lang.Double.doubleToRawLongBits(actual(1)) ===
+ java.lang.Double.doubleToRawLongBits(0.0d))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]