This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6b86244834fc [SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark 6b86244834fc is described below commit 6b86244834fcc589aac60260beb10061b744831a Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu May 15 13:18:16 2025 +0800 [SPARK-52078][TEST] Add ZStandardTPCDSDataBenchmark ### What changes were proposed in this pull request? We found some unreasonable benchmark results during upgrading zstd-jni from 1.5.6-10 to 1.5.7-x in https://github.com/apache/spark/pull/50057, and the author suggests using real-world data for zstd compression benchmark. ### Why are the changes needed? Add a new benchmark for zstd with more reasonable data. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested on a local machine, Ubuntu 24.04, Intel(R) Core(TM) i5-9500 CPU 3.00GHz zstd-jni:1.5.6-10 ``` ================================================================================================ Benchmark ZStandardCompressionCodec ================================================================================================ OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------- Compression 4 times at level 1 without buffer pool 2737 2742 6 0.0 684299199.3 1.0X Compression 4 times at level 2 without buffer pool 4217 4218 2 0.0 1054165072.5 0.6X Compression 4 times at level 3 without buffer pool 5660 5661 2 0.0 1414928809.8 0.5X Compression 4 times at level 1 with buffer pool 2739 2743 6 0.0 684719746.2 1.0X Compression 4 times at level 2 with buffer pool 4186 4191 8 0.0 1046477235.5 0.7X Compression 4 times at level 3 with buffer pool 5663 5667 5 0.0 1415762083.2 0.5X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- Decompression 4 times from level 1 without buffer pool 943 950 10 0.0 235749387.0 1.0X Decompression 4 times from level 2 without buffer pool 1239 1244 6 0.0 309753079.0 0.8X Decompression 4 times from level 3 without buffer pool 1468 1484 23 0.0 366946390.8 0.6X Decompression 4 times from level 1 with buffer pool 933 942 9 0.0 233286880.8 1.0X Decompression 4 times from level 2 with buffer pool 1142 1171 40 0.0 285605190.0 0.8X Decompression 4 times from level 3 with buffer pool 1394 1404 13 0.0 348546518.3 0.7X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Parallel Compression with 0 workers 1889 1899 14 0.0 472156817.0 1.0X Parallel Compression with 1 workers 1715 1717 2 0.0 428826617.0 1.1X Parallel Compression with 2 workers 904 906 2 0.0 225890052.0 2.1X Parallel Compression with 4 workers 539 548 8 0.0 134735732.5 3.5X Parallel Compression with 8 workers 540 548 9 0.0 134889447.5 3.5X Parallel Compression with 16 workers 577 589 23 0.0 144182540.7 3.3X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Parallel Compression with 0 workers 9555 9567 18 0.0 2388642623.3 1.0X Parallel Compression with 1 workers 7973 8006 47 0.0 1993145509.0 1.2X Parallel Compression with 2 workers 5070 5071 1 0.0 1267405763.3 1.9X Parallel Compression with 4 workers 4420 4421 1 0.0 1104977620.3 2.2X Parallel Compression with 8 workers 4790 4800 15 0.0 1197417939.0 2.0X Parallel Compression with 16 workers 5000 5003 5 0.0 1249965510.5 1.9X ``` zstd-jni:1.5.7-3 ``` ================================================================================================ Benchmark ZStandardCompressionCodec ================================================================================================ OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------- Compression 4 times at level 1 without buffer pool 2700 2709 13 0.0 674967564.0 1.0X Compression 4 times at level 2 without buffer pool 4148 4149 0 0.0 1037124857.0 0.7X Compression 4 times at level 3 without buffer pool 5660 5682 31 0.0 1414968620.0 0.5X Compression 4 times at level 1 with buffer pool 2718 2728 14 0.0 679514554.3 1.0X Compression 4 times at level 2 with buffer pool 4130 4131 2 0.0 1032476406.2 0.7X Compression 4 times at level 3 with buffer pool 5571 5576 6 0.0 1392871057.5 0.5X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- Decompression 4 times from level 1 without buffer pool 942 951 9 0.0 235523684.5 1.0X Decompression 4 times from level 2 without buffer pool 1248 1270 31 0.0 311906360.5 0.8X Decompression 4 times from level 3 without buffer pool 1472 1475 4 0.0 368071680.5 0.6X Decompression 4 times from level 1 with buffer pool 939 956 18 0.0 234631810.0 1.0X Decompression 4 times from level 2 with buffer pool 1249 1261 16 0.0 312318610.5 0.8X Decompression 4 times from level 3 with buffer pool 1475 1475 0 0.0 368765939.3 0.6X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Parallel Compression with 0 workers 1865 1873 11 0.0 466278397.5 1.0X Parallel Compression with 1 workers 1785 1793 10 0.0 446359936.8 1.0X Parallel Compression with 2 workers 945 953 10 0.0 236142005.8 2.0X Parallel Compression with 4 workers 559 577 29 0.0 139754505.5 3.3X Parallel Compression with 8 workers 537 555 13 0.0 134328778.3 3.5X Parallel Compression with 16 workers 587 614 23 0.0 146784965.5 3.2X OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.12.10-76061203-generic Intel(R) Core(TM) i5-9500 CPU 3.00GHz Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Parallel Compression with 0 workers 9365 9375 14 0.0 2341247379.0 1.0X Parallel Compression with 1 workers 8022 8022 0 0.0 2005448255.8 1.2X Parallel Compression with 2 workers 5054 5069 22 0.0 1263445148.8 1.9X Parallel Compression with 4 workers 4372 4394 31 0.0 1092926980.8 2.1X Parallel Compression with 8 workers 4785 4805 28 0.0 1196282275.0 2.0X Parallel Compression with 16 workers 5012 5028 23 0.0 1252925049.5 1.9X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50857 from pan3793/SPARK-52078. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .github/workflows/benchmark.yml | 21 ++-- .../ZStandardTPCDSDataBenchmark-jdk21-results.txt | 49 ++++++++ .../ZStandardTPCDSDataBenchmark-results.txt | 49 ++++++++ .../spark/io/ZStandardTPCDSDataBenchmark.scala | 123 +++++++++++++++++++++ 4 files changed, 235 insertions(+), 7 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9bfe79cfa2fe..6b2e72b3f23b 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -66,8 +66,8 @@ jobs: # Any TPC-DS related updates on this job need to be applied to tpcds-1g job of build_and_test.yml as well tpcds-1g-gen: - name: "Generate an input dataset for TPCDSQueryBenchmark with SF=1" - if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*') + name: "Generate an TPC-DS dataset with SF=1" + if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*') runs-on: ubuntu-latest env: SPARK_LOCAL_IP: localhost @@ -98,7 +98,9 @@ jobs: id: cache-tpcds-sf-1 uses: actions/cache@v4 with: - path: ./tpcds-sf-1 + path: | + ./tpcds-sf-1 + ./tpcds-sf-1-text key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Checkout tpcds-kit repository if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' @@ -118,7 +120,9 @@ jobs: java-version: ${{ inputs.jdk }} - name: Generate TPC-DS (SF=1) table data if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' - run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" + run: | + build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" + mkdir -p `pwd`/tpcds-sf-1-text && rm -f `pwd`/tpcds-sf-1-text/* && `pwd`/tpcds-kit/tools/dsdgen -DISTRIBUTIONS `pwd`/tpcds-kit/tools/tpcds.idx -SCALE 1 -DIR `pwd`/tpcds-sf-1-text benchmark: name: "Run benchmarks: ${{ inputs.class }} (JDK ${{ inputs.jdk }}, Scala ${{ inputs.scala }}, ${{ matrix.split }} out of ${{ inputs.num-splits }} splits)" @@ -138,6 +142,7 @@ jobs: # To prevent spark.test.home not being set. See more detail in SPARK-36007. SPARK_HOME: ${{ github.workspace }} SPARK_TPCDS_DATA: ${{ github.workspace }}/tpcds-sf-1 + SPARK_TPCDS_DATA_TEXT: ${{ github.workspace }}/tpcds-sf-1-text steps: - name: Checkout Spark repository uses: actions/checkout@v4 @@ -167,11 +172,13 @@ jobs: distribution: zulu java-version: ${{ inputs.jdk }} - name: Cache TPC-DS generated data - if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, '*') + if: contains(inputs.class, 'TPCDSQueryBenchmark') || contains(inputs.class, 'ZStandardTPCDSDataBenchmark') || contains(inputs.class, '*') id: cache-tpcds-sf-1 uses: actions/cache@v4 with: - path: ./tpcds-sf-1 + path: | + ./tpcds-sf-1 + ./tpcds-sf-1-text key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Run benchmarks run: | @@ -188,7 +195,7 @@ jobs: # To keep the directory structure and file permissions, tar them # See also https://github.com/actions/upload-artifact#maintaining-file-permissions-and-case-sensitive-files echo "Preparing the benchmark results:" - tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard` + tar -cvf benchmark-results-${{ inputs.jdk }}-${{ inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude=tpcds-sf-1-text --exclude-standard` - name: Upload benchmark results uses: actions/upload-artifact@v4 with: diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt new file mode 100644 index 000000000000..49606c5ab280 --- /dev/null +++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-jdk21-results.txt @@ -0,0 +1,49 @@ +================================================================================================ +Benchmark ZStandardCompressionCodec +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure +AMD EPYC 7763 64-Core Processor +Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +Compression 4 times at level 1 without buffer pool 2539 2541 2 0.0 634832028.5 1.0X +Compression 4 times at level 2 without buffer pool 4157 4188 44 0.0 1039277864.3 0.6X +Compression 4 times at level 3 without buffer pool 6091 6095 5 0.0 1522781623.3 0.4X +Compression 4 times at level 1 with buffer pool 2536 2540 5 0.0 634097186.3 1.0X +Compression 4 times at level 2 with buffer pool 4147 4150 4 0.0 1036639857.0 0.6X +Compression 4 times at level 3 with buffer pool 6097 6099 3 0.0 1524134426.0 0.4X + +OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure +AMD EPYC 7763 64-Core Processor +Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------- +Decompression 4 times from level 1 without buffer pool 886 902 23 0.0 221484611.2 1.0X +Decompression 4 times from level 2 without buffer pool 1109 1130 30 0.0 277257788.3 0.8X +Decompression 4 times from level 3 without buffer pool 1336 1359 32 0.0 334102921.8 0.7X +Decompression 4 times from level 1 with buffer pool 858 868 9 0.0 214401966.0 1.0X +Decompression 4 times from level 2 with buffer pool 1131 1140 12 0.0 282739707.3 0.8X +Decompression 4 times from level 3 with buffer pool 1366 1375 12 0.0 341571527.0 0.6X + +OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 2030 2033 4 0.0 507451934.3 1.0X +Parallel Compression with 1 workers 1879 1882 4 0.0 469750208.3 1.1X +Parallel Compression with 2 workers 969 976 10 0.0 242174332.5 2.1X +Parallel Compression with 4 workers 711 713 2 0.0 177820489.8 2.9X +Parallel Compression with 8 workers 847 898 53 0.0 211649152.3 2.4X +Parallel Compression with 16 workers 848 859 10 0.0 211876140.0 2.4X + +OpenJDK 64-Bit Server VM 21.0.7+6-LTS on Linux 6.11.0-1013-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 8266 8278 16 0.0 2066565583.8 1.0X +Parallel Compression with 1 workers 6933 6941 10 0.0 1733356075.3 1.2X +Parallel Compression with 2 workers 3690 3691 1 0.0 922481882.3 2.2X +Parallel Compression with 4 workers 3223 3231 11 0.0 805643345.5 2.6X +Parallel Compression with 8 workers 3652 3656 7 0.0 912916115.3 2.3X +Parallel Compression with 16 workers 3912 3950 54 0.0 977901486.2 2.1X + + diff --git a/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt new file mode 100644 index 000000000000..d5091826fa9a --- /dev/null +++ b/core/benchmarks/ZStandardTPCDSDataBenchmark-results.txt @@ -0,0 +1,49 @@ +================================================================================================ +Benchmark ZStandardCompressionCodec +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure +AMD EPYC 7763 64-Core Processor +Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +Compression 4 times at level 1 without buffer pool 2518 2519 1 0.0 629582183.5 1.0X +Compression 4 times at level 2 without buffer pool 4111 4111 1 0.0 1027767031.5 0.6X +Compression 4 times at level 3 without buffer pool 6146 6160 19 0.0 1536532700.3 0.4X +Compression 4 times at level 1 with buffer pool 2517 2517 1 0.0 629208370.5 1.0X +Compression 4 times at level 2 with buffer pool 4105 4112 11 0.0 1026190298.0 0.6X +Compression 4 times at level 3 with buffer pool 6154 6157 5 0.0 1538378430.0 0.4X + +OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure +AMD EPYC 7763 64-Core Processor +Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------- +Decompression 4 times from level 1 without buffer pool 900 903 4 0.0 225055920.0 1.0X +Decompression 4 times from level 2 without buffer pool 1161 1163 3 0.0 290146657.0 0.8X +Decompression 4 times from level 3 without buffer pool 1399 1406 10 0.0 349650877.8 0.6X +Decompression 4 times from level 1 with buffer pool 899 901 2 0.0 224627803.0 1.0X +Decompression 4 times from level 2 with buffer pool 1165 1166 1 0.0 291335735.3 0.8X +Decompression 4 times from level 3 with buffer pool 1398 1401 4 0.0 349578394.0 0.6X + +OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 2061 2067 8 0.0 515297811.0 1.0X +Parallel Compression with 1 workers 1843 1844 1 0.0 460705797.3 1.1X +Parallel Compression with 2 workers 961 972 16 0.0 240177085.3 2.1X +Parallel Compression with 4 workers 729 731 2 0.0 182208026.2 2.8X +Parallel Compression with 8 workers 781 800 18 0.0 195212932.0 2.6X +Parallel Compression with 16 workers 865 871 6 0.0 216145271.5 2.4X + +OpenJDK 64-Bit Server VM 17.0.15+6-LTS on Linux 6.11.0-1014-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 8557 8635 110 0.0 2139353975.8 1.0X +Parallel Compression with 1 workers 7156 7193 52 0.0 1789023949.5 1.2X +Parallel Compression with 2 workers 3855 3861 9 0.0 963635046.3 2.2X +Parallel Compression with 4 workers 3248 3253 8 0.0 811889324.8 2.6X +Parallel Compression with 8 workers 3667 3671 6 0.0 916671282.5 2.3X +Parallel Compression with 16 workers 3799 3845 65 0.0 949757174.5 2.3X + + diff --git a/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala new file mode 100644 index 000000000000..69820c15be30 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/io/ZStandardTPCDSDataBenchmark.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.io + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream, OutputStream} +import java.nio.file.{Files, Paths} + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS} + +/** + * Benchmark for ZStandard codec performance. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class <this class> <spark core test jar> + * 2. build/sbt "core/Test/runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>" + * Results will be written to "benchmarks/ZStandardTPCDSDataBenchmark-results.txt". + * }}} + */ +object ZStandardTPCDSDataBenchmark extends BenchmarkBase { + + val N = 4 + + // the size of TPCDS catalog_sales.dat (SF1) is about 283M + val data = Files.readAllBytes(Paths.get(sys.env("SPARK_TPCDS_DATA_TEXT"), "catalog_sales.dat")) + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val name = "Benchmark ZStandardCompressionCodec" + runBenchmark(name) { + val benchmark1 = new Benchmark(name, N, output = output) + compressionBenchmark(benchmark1, N) + benchmark1.run() + + val benchmark2 = new Benchmark(name, N, output = output) + decompressionBenchmark(benchmark2, N) + benchmark2.run() + parallelCompressionBenchmark() + } + } + + private def compressionBenchmark(benchmark: Benchmark, N: Int): Unit = { + Seq(false, true).foreach { enablePool => + Seq(1, 2, 3).foreach { level => + val conf = new SparkConf(false) + .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool) + .set(IO_COMPRESSION_ZSTD_LEVEL, level) + val condition = if (enablePool) "with" else "without" + benchmark.addCase(s"Compression $N times at level $level $condition buffer pool") { _ => + (1 until N).foreach { _ => + val os = new ZStdCompressionCodec(conf) + .compressedOutputStream(OutputStream.nullOutputStream()) + os.write(data) + os.close() + } + } + } + } + } + + private def decompressionBenchmark(benchmark: Benchmark, N: Int): Unit = { + Seq(false, true).foreach { enablePool => + Seq(1, 2, 3).foreach { level => + val conf = new SparkConf(false) + .set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, enablePool) + .set(IO_COMPRESSION_ZSTD_LEVEL, level) + val outputStream = new ByteArrayOutputStream() + val out = new ZStdCompressionCodec(conf).compressedOutputStream(outputStream) + out.write(data) + out.close() + val bytes = outputStream.toByteArray + + val condition = if (enablePool) "with" else "without" + benchmark.addCase(s"Decompression $N times from level $level $condition buffer pool") { _ => + (1 until N).foreach { _ => + val bais = new ByteArrayInputStream(bytes) + val is = new ZStdCompressionCodec(conf).compressedInputStream(bais) + is.readAllBytes() + is.close() + } + } + } + } + } + + private def parallelCompressionBenchmark(): Unit = { + Seq(3, 9).foreach { level => + val benchmark = new Benchmark( + s"Parallel Compression at level $level", N, output = output) + Seq(0, 1, 2, 4, 8, 16).foreach { workers => + val conf = new SparkConf(false) + .set(IO_COMPRESSION_ZSTD_LEVEL, level) + .set(IO_COMPRESSION_ZSTD_WORKERS, workers) + benchmark.addCase(s"Parallel Compression with $workers workers") { _ => + val os = OutputStream.nullOutputStream() + val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(os) + val oos = new ObjectOutputStream(zcos) + 1 to N foreach { _ => + oos.writeObject(data) + } + oos.close() + } + } + benchmark.run() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org