This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dc57455589a9 [SPARK-54571][CORE][SQL] Use LZ4 safeDecompressor to
mitigate perf regression
dc57455589a9 is described below
commit dc57455589a9f160638ab3526ca359eb84f76d67
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Feb 27 20:27:21 2026 +0800
[SPARK-54571][CORE][SQL] Use LZ4 safeDecompressor to mitigate perf
regression
### What changes were proposed in this pull request?
Previously, lz4-java was upgraded to 1.10.x to address CVEs,
- https://github.com/apache/spark/pull/53327
- https://github.com/apache/spark/pull/53347
- https://github.com/apache/spark/pull/53971
while this casues significant performance drop, see the benchmark report at
- https://github.com/apache/spark/pull/53453
this PR follows the
[suggestion](https://github.com/apache/spark/pull/53290#issuecomment-3607045004)
to migrate to safeDecompressor.
### Why are the changes needed?
Mitigate performance regression.
### Does this PR introduce _any_ user-facing change?
No, except for performance.
### How was this patch tested?
GHA for functionality,
[benchmark](https://github.com/apache/spark/pull/53453#issuecomment-3645530618)
for performance.
> TL;DR - my test results show lz4-java 1.10.1 is about 10~15% slower on
lz4 compression than 1.8.0, and is about ~5% slower on lz4 decompression even
with migrating to suggested safeDecompressor
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53454 from pan3793/SPARK-54571.
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: pan3793 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt | 4 ++--
core/benchmarks/LZ4TPCDSDataBenchmark-results.txt | 4 ++--
.../src/main/scala/org/apache/spark/io/CompressionCodec.scala | 11 +++++------
.../apache/spark/sql/catalyst/plans/logical/Statistics.scala | 7 +++++--
4 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
index 578f710b1da5..a066ced506b5 100644
--- a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
+++ b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
@@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux
6.14.0-1017-azure
AMD EPYC 7763 64-Core Processor
Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Compression 4 times 2612 2624
17 0.0 652960433.5 1.0X
+Compression 4 times 2611 2619
11 0.0 652760335.3 1.0X
OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure
AMD EPYC 7763 64-Core Processor
Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Decompression 4 times 2219 2220
1 0.0 554762743.5 1.0X
+Decompression 4 times 896 912
20 0.0 224050201.0 1.0X
diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
index 7a5c89955eeb..8d909dfc7f49 100644
--- a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
+++ b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
@@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux
6.14.0-1017-azure
AMD EPYC 7763 64-Core Processor
Compression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Compression 4 times 2605 2611
8 0.0 651243236.8 1.0X
+Compression 4 times 2602 2609
10 0.0 650438397.0 1.0X
OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure
AMD EPYC 7763 64-Core Processor
Decompression: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Decompression 4 times 2361 2367
10 0.0 590134148.0 1.0X
+Decompression 4 times 938 966
33 0.0 234424499.5 1.0X
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index b81e46667323..243ee689cb5f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -152,12 +152,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends
CompressionCodec {
}
override def compressedInputStream(s: InputStream): InputStream = {
- val disableConcatenationOfByteStream = false
- new LZ4BlockInputStream(
- s,
- lz4Factory.fastDecompressor(),
- xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
- disableConcatenationOfByteStream)
+ LZ4BlockInputStream.newBuilder()
+ .withDecompressor(lz4Factory.safeDecompressor())
+ .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum)
+ .withStopOnEmptyBlock(false)
+ .build(s)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index a2850a0b179f..d88bc62d0bc1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
DataInputStream, Da
import java.math.{MathContext, RoundingMode}
import java.util.Base64
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.expressions._
@@ -210,7 +210,10 @@ object HistogramSerializer {
final def deserialize(str: String): Histogram = {
val bytes = Base64.getDecoder().decode(str)
val bis = new ByteArrayInputStream(bytes)
- val ins = new DataInputStream(new LZ4BlockInputStream(bis))
+ val ins = new DataInputStream(
+ LZ4BlockInputStream.newBuilder()
+ .withDecompressor(LZ4Factory.fastestInstance().safeDecompressor())
+ .build(bis))
val height = ins.readDouble()
val numBins = ins.readInt()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]