This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b86244834fc [SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark
6b86244834fc is described below

commit 6b86244834fcc589aac60260beb10061b744831a
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Thu May 15 13:18:16 2025 +0800

    [SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark
    
    ### What changes were proposed in this pull request?
    
    We found some unreasonable benchmark results during upgrading zstd-jni from 
1.5.6-10 to 1.5.7-x in https://github.com/apache/spark/pull/50057, and the 
author suggests using real-world data for zstd compression benchmark.
    
    ### Why are the changes needed?
    
    Add a new benchmark for zstd with more reasonable data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested on a local machine, Ubuntu 24.04, Intel(R) Core(TM) i5-9500 CPU  
3.00GHz
    
    zstd-jni:1.5.6-10
    ```
    
================================================================================================
    Benchmark ZStandardCompressionCodec
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Benchmark ZStandardCompressionCodec:                Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
----------------------------------------------------------------------------------------------------------------------------------
    Compression 4 times at level 1 without buffer pool           2737           
2742           6          0.0   684299199.3       1.0X
    Compression 4 times at level 2 without buffer pool           4217           
4218           2          0.0  1054165072.5       0.6X
    Compression 4 times at level 3 without buffer pool           5660           
5661           2          0.0  1414928809.8       0.5X
    Compression 4 times at level 1 with buffer pool              2739           
2743           6          0.0   684719746.2       1.0X
    Compression 4 times at level 2 with buffer pool              4186           
4191           8          0.0  1046477235.5       0.7X
    Compression 4 times at level 3 with buffer pool              5663           
5667           5          0.0  1415762083.2       0.5X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
--------------------------------------------------------------------------------------------------------------------------------------
    Decompression 4 times from level 1 without buffer pool            943       
     950          10          0.0   235749387.0       1.0X
    Decompression 4 times from level 2 without buffer pool           1239       
    1244           6          0.0   309753079.0       0.8X
    Decompression 4 times from level 3 without buffer pool           1468       
    1484          23          0.0   366946390.8       0.6X
    Decompression 4 times from level 1 with buffer pool               933       
     942           9          0.0   233286880.8       1.0X
    Decompression 4 times from level 2 with buffer pool              1142       
    1171          40          0.0   285605190.0       0.8X
    Decompression 4 times from level 3 with buffer pool              1394       
    1404          13          0.0   348546518.3       0.7X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    Parallel Compression with 0 workers                1889           1899      
    14          0.0   472156817.0       1.0X
    Parallel Compression with 1 workers                1715           1717      
     2          0.0   428826617.0       1.1X
    Parallel Compression with 2 workers                 904            906      
     2          0.0   225890052.0       2.1X
    Parallel Compression with 4 workers                 539            548      
     8          0.0   134735732.5       3.5X
    Parallel Compression with 8 workers                 540            548      
     9          0.0   134889447.5       3.5X
    Parallel Compression with 16 workers                577            589      
    23          0.0   144182540.7       3.3X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    Parallel Compression with 0 workers                9555           9567      
    18          0.0  2388642623.3       1.0X
    Parallel Compression with 1 workers                7973           8006      
    47          0.0  1993145509.0       1.2X
    Parallel Compression with 2 workers                5070           5071      
     1          0.0  1267405763.3       1.9X
    Parallel Compression with 4 workers                4420           4421      
     1          0.0  1104977620.3       2.2X
    Parallel Compression with 8 workers                4790           4800      
    15          0.0  1197417939.0       2.0X
    Parallel Compression with 16 workers               5000           5003      
     5          0.0  1249965510.5       1.9X
    ```
    
    zstd-jni:1.5.7-3
    ```
    
================================================================================================
    Benchmark ZStandardCompressionCodec
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Benchmark ZStandardCompressionCodec:                Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
----------------------------------------------------------------------------------------------------------------------------------
    Compression 4 times at level 1 without buffer pool           2700           
2709          13          0.0   674967564.0       1.0X
    Compression 4 times at level 2 without buffer pool           4148           
4149           0          0.0  1037124857.0       0.7X
    Compression 4 times at level 3 without buffer pool           5660           
5682          31          0.0  1414968620.0       0.5X
    Compression 4 times at level 1 with buffer pool              2718           
2728          14          0.0   679514554.3       1.0X
    Compression 4 times at level 2 with buffer pool              4130           
4131           2          0.0  1032476406.2       0.7X
    Compression 4 times at level 3 with buffer pool              5571           
5576           6          0.0  1392871057.5       0.5X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
--------------------------------------------------------------------------------------------------------------------------------------
    Decompression 4 times from level 1 without buffer pool            942       
     951           9          0.0   235523684.5       1.0X
    Decompression 4 times from level 2 without buffer pool           1248       
    1270          31          0.0   311906360.5       0.8X
    Decompression 4 times from level 3 without buffer pool           1472       
    1475           4          0.0   368071680.5       0.6X
    Decompression 4 times from level 1 with buffer pool               939       
     956          18          0.0   234631810.0       1.0X
    Decompression 4 times from level 2 with buffer pool              1249       
    1261          16          0.0   312318610.5       0.8X
    Decompression 4 times from level 3 with buffer pool              1475       
    1475           0          0.0   368765939.3       0.6X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    Parallel Compression with 0 workers                1865           1873      
    11          0.0   466278397.5       1.0X
    Parallel Compression with 1 workers                1785           1793      
    10          0.0   446359936.8       1.0X
    Parallel Compression with 2 workers                 945            953      
    10          0.0   236142005.8       2.0X
    Parallel Compression with 4 workers                 559            577      
    29          0.0   139754505.5       3.3X
    Parallel Compression with 8 workers                 537            555      
    13          0.0   134328778.3       3.5X
    Parallel Compression with 16 workers                587            614      
    23          0.0   146784965.5       3.2X
    
    OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic
    Intel(R) Core(TM) i5-9500 CPU  3.00GHz
    Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    Parallel Compression with 0 workers                9365           9375      
    14          0.0  2341247379.0       1.0X
    Parallel Compression with 1 workers                8022           8022      
     0          0.0  2005448255.8       1.2X
    Parallel Compression with 2 workers                5054           5069      
    22          0.0  1263445148.8       1.9X
    Parallel Compression with 4 workers                4372           4394      
    31          0.0  1092926980.8       2.1X
    Parallel Compression with 8 workers                4785           4805      
    28          0.0  1196282275.0       2.0X
    Parallel Compression with 16 workers               5012           5028      
    23          0.0  1252925049.5       1.9X
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50857 from pan3793/SPARK-52078.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .github/workflows/benchmark.yml                    |  21 ++--
 .../ZStandardTPCDSDataBenchmark-jdk21-results.txt  |  49 ++++++++
 .../ZStandardTPCDSDataBenchmark-results.txt        |  49 ++++++++
 .../spark/io/ZStandardTPCDSDataBenchmark.scala     | 123 +++++++++++++++++++++
 4 files changed, 235 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml
index 9bfe79cfa2fe..6b2e72b3f23b 100644
--- a/.github/workflows/benchmark.yml
+++ b/.github/workflows/benchmark.yml
@@ -66,8 +66,8 @@ jobs:
 
   # Any TPC-DS related updates on this job need to be applied to tpcds-1g job 
of build_and_test.yml as well
   tpcds-1g-gen:
-    name: "Generate an input dataset for TPCDSQueryBenchmark with SF=1"
-    if: contains(inputs.class, 'TPCDSQueryBenchmark') || 
contains(inputs.class, '*')
+    name: "Generate an TPC-DS dataset with SF=1"
+    if: contains(inputs.class, 'TPCDSQueryBenchmark') || 
contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, 
'*')
     runs-on: ubuntu-latest
     env:
       SPARK_LOCAL_IP: localhost
@@ -98,7 +98,9 @@ jobs:
         id: cache-tpcds-sf-1
         uses: actions/cache@v4
         with:
-          path: ./tpcds-sf-1
+          path: |
+            ./tpcds-sf-1
+            ./tpcds-sf-1-text
           key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 
'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
       - name: Checkout tpcds-kit repository
         if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
@@ -118,7 +120,9 @@ jobs:
           java-version: ${{ inputs.jdk }}
       - name: Generate TPC-DS (SF=1) table data
         if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
-        run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData 
--dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 
--numPartitions 1 --overwrite"
+        run: |
+          build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData 
--dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 
--numPartitions 1 --overwrite"
+          mkdir -p `pwd`/tpcds-sf-1-text && rm -f `pwd`/tpcds-sf-1-text/* && 
`pwd`/tpcds-kit/tools/dsdgen -DISTRIBUTIONS `pwd`/tpcds-kit/tools/tpcds.idx 
-SCALE 1 -DIR `pwd`/tpcds-sf-1-text
 
   benchmark:
     name: "Run benchmarks: ${{ inputs.class }} (JDK ${{ inputs.jdk }}, Scala 
${{ inputs.scala }}, ${{ matrix.split }} out of ${{ inputs.num-splits }} 
splits)"
@@ -138,6 +142,7 @@ jobs:
       # To prevent spark.test.home not being set. See more detail in 
SPARK-36007.
       SPARK_HOME: ${{ github.workspace }}
       SPARK_TPCDS_DATA: ${{ github.workspace }}/tpcds-sf-1
+      SPARK_TPCDS_DATA_TEXT: ${{ github.workspace }}/tpcds-sf-1-text
     steps:
     - name: Checkout Spark repository
       uses: actions/checkout@v4
@@ -167,11 +172,13 @@ jobs:
         distribution: zulu
         java-version: ${{ inputs.jdk }}
     - name: Cache TPC-DS generated data
-      if: contains(inputs.class, 'TPCDSQueryBenchmark') || 
contains(inputs.class, '*')
+      if: contains(inputs.class, 'TPCDSQueryBenchmark') || 
contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, 
'*')
       id: cache-tpcds-sf-1
       uses: actions/cache@v4
       with:
-        path: ./tpcds-sf-1
+        path: |
+          ./tpcds-sf-1
+          ./tpcds-sf-1-text
         key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 
'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
     - name: Run benchmarks
       run: |
@@ -188,7 +195,7 @@ jobs:
         # To keep the directory structure and file permissions, tar them
         # See also 
https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files
         echo "Preparing the benchmark results:"
-        tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar 
`git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 
--exclude-standard`
+        tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar 
`git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 
--exclude=tpcds-sf-1-text --exclude-standard`
     - name: Upload benchmark results
       uses: actions/upload-artifact@v4
       with:
diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt 
b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
new file mode 100644
index 000000000000..49606c5ab280
--- /dev/null
+++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt
@@ -0,0 +1,49 @@
+================================================================================================
+Benchmark ZStandardCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec:                Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------------
+Compression 4 times at level 1 without buffer pool           2539           
2541           2          0.0   634832028.5       1.0X
+Compression 4 times at level 2 without buffer pool           4157           
4188          44          0.0  1039277864.3       0.6X
+Compression 4 times at level 3 without buffer pool           6091           
6095           5          0.0  1522781623.3       0.4X
+Compression 4 times at level 1 with buffer pool              2536           
2540           5          0.0   634097186.3       1.0X
+Compression 4 times at level 2 with buffer pool              4147           
4150           4          0.0  1036639857.0       0.6X
+Compression 4 times at level 3 with buffer pool              6097           
6099           3          0.0  1524134426.0       0.4X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------------
+Decompression 4 times from level 1 without buffer pool            886          
  902          23          0.0   221484611.2       1.0X
+Decompression 4 times from level 2 without buffer pool           1109          
 1130          30          0.0   277257788.3       0.8X
+Decompression 4 times from level 3 without buffer pool           1336          
 1359          32          0.0   334102921.8       0.7X
+Decompression 4 times from level 1 with buffer pool               858          
  868           9          0.0   214401966.0       1.0X
+Decompression 4 times from level 2 with buffer pool              1131          
 1140          12          0.0   282739707.3       0.8X
+Decompression 4 times from level 3 with buffer pool              1366          
 1375          12          0.0   341571527.0       0.6X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                2030           2033         
  4          0.0   507451934.3       1.0X
+Parallel Compression with 1 workers                1879           1882         
  4          0.0   469750208.3       1.1X
+Parallel Compression with 2 workers                 969            976         
 10          0.0   242174332.5       2.1X
+Parallel Compression with 4 workers                 711            713         
  2          0.0   177820489.8       2.9X
+Parallel Compression with 8 workers                 847            898         
 53          0.0   211649152.3       2.4X
+Parallel Compression with 16 workers                848            859         
 10          0.0   211876140.0       2.4X
+
+OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                8266           8278         
 16          0.0  2066565583.8       1.0X
+Parallel Compression with 1 workers                6933           6941         
 10          0.0  1733356075.3       1.2X
+Parallel Compression with 2 workers                3690           3691         
  1          0.0   922481882.3       2.2X
+Parallel Compression with 4 workers                3223           3231         
 11          0.0   805643345.5       2.6X
+Parallel Compression with 8 workers                3652           3656         
  7          0.0   912916115.3       2.3X
+Parallel Compression with 16 workers               3912           3950         
 54          0.0   977901486.2       2.1X
+
+
diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt 
b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
new file mode 100644
index 000000000000..d5091826fa9a
--- /dev/null
+++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt
@@ -0,0 +1,49 @@
+================================================================================================
+Benchmark ZStandardCompressionCodec
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec:                Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------------
+Compression 4 times at level 1 without buffer pool           2518           
2519           1          0.0   629582183.5       1.0X
+Compression 4 times at level 2 without buffer pool           4111           
4111           1          0.0  1027767031.5       0.6X
+Compression 4 times at level 3 without buffer pool           6146           
6160          19          0.0  1536532700.3       0.4X
+Compression 4 times at level 1 with buffer pool              2517           
2517           1          0.0   629208370.5       1.0X
+Compression 4 times at level 2 with buffer pool              4105           
4112          11          0.0  1026190298.0       0.6X
+Compression 4 times at level 3 with buffer pool              6154           
6157           5          0.0  1538378430.0       0.4X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------------------
+Decompression 4 times from level 1 without buffer pool            900          
  903           4          0.0   225055920.0       1.0X
+Decompression 4 times from level 2 without buffer pool           1161          
 1163           3          0.0   290146657.0       0.8X
+Decompression 4 times from level 3 without buffer pool           1399          
 1406          10          0.0   349650877.8       0.6X
+Decompression 4 times from level 1 with buffer pool               899          
  901           2          0.0   224627803.0       1.0X
+Decompression 4 times from level 2 with buffer pool              1165          
 1166           1          0.0   291335735.3       0.8X
+Decompression 4 times from level 3 with buffer pool              1398          
 1401           4          0.0   349578394.0       0.6X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                2061           2067         
  8          0.0   515297811.0       1.0X
+Parallel Compression with 1 workers                1843           1844         
  1          0.0   460705797.3       1.1X
+Parallel Compression with 2 workers                 961            972         
 16          0.0   240177085.3       2.1X
+Parallel Compression with 4 workers                 729            731         
  2          0.0   182208026.2       2.8X
+Parallel Compression with 8 workers                 781            800         
 18          0.0   195212932.0       2.6X
+Parallel Compression with 16 workers                865            871         
  6          0.0   216145271.5       2.4X
+
+OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                8557           8635         
110          0.0  2139353975.8       1.0X
+Parallel Compression with 1 workers                7156           7193         
 52          0.0  1789023949.5       1.2X
+Parallel Compression with 2 workers                3855           3861         
  9          0.0   963635046.3       2.2X
+Parallel Compression with 4 workers                3248           3253         
  8          0.0   811889324.8       2.6X
+Parallel Compression with 8 workers                3667           3671         
  6          0.0   916671282.5       2.3X
+Parallel Compression with 16 workers               3799           3845         
 65          0.0   949757174.5       2.3X
+
+
diff --git 
a/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala 
b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala
new file mode 100644
index 000000000000..69820c15be30
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectOutputStream, OutputStream}
+import java.nio.file.{Files, Paths}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import 
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, 
IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS}
+
+/**
+ * Benchmark for ZStandard codec performance.
+ * {{{
+ *   To run this benchmark:
+ *   1. without sbt: bin/spark-submit --class <this class> <spark core test 
jar>
+ *   2. build/sbt "core/Test/runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"core/Test/runMain <this class>"
+ *      Results will be written to 
"benchmarks/ZStandardTPCDSDataBenchmark-results.txt".
+ * }}}
+ */
+object ZStandardTPCDSDataBenchmark extends BenchmarkBase {
+
+  val N = 4
+
+  // the size of TPCDS catalog_sales.dat (SF1) is about 283M
+  val data = Files.readAllBytes(Paths.get(sys.env("SPARK_TPCDS_DATA_TEXT"), 
"catalog_sales.dat"))
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val name = "Benchmark ZStandardCompressionCodec"
+    runBenchmark(name) {
+      val benchmark1 = new Benchmark(name, N, output = output)
+      compressionBenchmark(benchmark1, N)
+      benchmark1.run()
+
+      val benchmark2 = new Benchmark(name, N, output = output)
+      decompressionBenchmark(benchmark2, N)
+      benchmark2.run()
+      parallelCompressionBenchmark()
+    }
+  }
+
+  private def compressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
+    Seq(false, true).foreach { enablePool =>
+      Seq(1, 2, 3).foreach { level =>
+        val conf = new SparkConf(false)
+          .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
+          .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+        val condition = if (enablePool) "with" else "without"
+        benchmark.addCase(s"Compression $N times at level $level $condition 
buffer pool") { _ =>
+          (1 until N).foreach { _ =>
+            val os = new ZStdCompressionCodec(conf)
+              .compressedOutputStream(OutputStream.nullOutputStream())
+            os.write(data)
+            os.close()
+          }
+        }
+      }
+    }
+  }
+
+  private def decompressionBenchmark(benchmark: Benchmark, N: Int): Unit = {
+    Seq(false, true).foreach { enablePool =>
+      Seq(1, 2, 3).foreach { level =>
+        val conf = new SparkConf(false)
+          .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool)
+          .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+        val outputStream = new ByteArrayOutputStream()
+        val out = new 
ZStdCompressionCodec(conf).compressedOutputStream(outputStream)
+        out.write(data)
+        out.close()
+        val bytes = outputStream.toByteArray
+
+        val condition = if (enablePool) "with" else "without"
+        benchmark.addCase(s"Decompression $N times from level $level 
$condition buffer pool") { _ =>
+          (1 until N).foreach { _ =>
+            val bais = new ByteArrayInputStream(bytes)
+            val is = new ZStdCompressionCodec(conf).compressedInputStream(bais)
+            is.readAllBytes()
+            is.close()
+          }
+        }
+      }
+    }
+  }
+
+  private def parallelCompressionBenchmark(): Unit = {
+    Seq(3, 9).foreach { level =>
+      val benchmark = new Benchmark(
+        s"Parallel Compression at level $level", N, output = output)
+      Seq(0, 1, 2, 4, 8, 16).foreach { workers =>
+        val conf = new SparkConf(false)
+          .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+          .set(IO_COMPRESSION_ZSTD_WORKERS, workers)
+        benchmark.addCase(s"Parallel Compression with $workers workers") { _ =>
+          val os = OutputStream.nullOutputStream()
+          val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(os)
+          val oos = new ObjectOutputStream(zcos)
+          1 to N foreach { _ =>
+            oos.writeObject(data)
+          }
+          oos.close()
+        }
+      }
+      benchmark.run()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to