This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc867266f08 [SPARK-45758][SQL] Introduce a mapper for hadoop
compression codecs
fc867266f08 is described below
commit fc867266f0898866ab5ff7ed82b0c7c5fbaccefc
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Nov 6 18:01:11 2023 +0800
[SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs
### What changes were proposed in this pull request?
Currently, Spark supported partial Hadoop compression codecs, but the
Hadoop supported compression codecs and spark supported are not completely
one-on-one due to Spark introduce two fake compression codecs none and
uncompress.
There are a lot of magic strings copy from Hadoop compression codecs. This
issue lead to developers need to manually maintain its consistency. It is easy
to make mistakes and reduce development efficiency.
### Why are the changes needed?
Let developers easy to use Hadoop compression codecs.
### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43620 from beliefer/SPARK-45758.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../sql/catalyst/util/HadoopCompressionCodec.java | 63 ++++++++++++++++++++++
.../sql/catalyst/util/CompressionCodecs.scala | 12 ++---
.../org/apache/spark/sql/DataFrameSuite.scala | 4 +-
.../benchmark/DataSourceReadBenchmark.scala | 8 ++-
.../sql/execution/datasources/csv/CSVSuite.scala | 4 +-
.../sql/execution/datasources/json/JsonSuite.scala | 4 +-
.../sql/execution/datasources/text/TextSuite.scala | 10 ++--
.../datasources/text/WholeTextFileSuite.scala | 3 +-
8 files changed, 87 insertions(+), 21 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
new file mode 100644
index 00000000000..ee4cb4da322
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+/**
+ * A mapper class from Spark supported hadoop compression codecs to hadoop
compression codecs.
+ */
+public enum HadoopCompressionCodec {
+ NONE(null),
+ UNCOMPRESSED(null),
+ BZIP2(new BZip2Codec()),
+ DEFLATE(new DeflateCodec()),
+ GZIP(new GzipCodec()),
+ LZ4(new Lz4Codec()),
+ SNAPPY(new SnappyCodec());
+
+ // TODO supports ZStandardCodec
+
+ private final CompressionCodec compressionCodec;
+
+ HadoopCompressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ }
+
+ public CompressionCodec getCompressionCodec() {
+ return this.compressionCodec;
+ }
+
+ private static final Map<String, String> codecNameMap =
+ Arrays.stream(HadoopCompressionCodec.values()).collect(
+ Collectors.toMap(Enum::name, codec ->
codec.name().toLowerCase(Locale.ROOT)));
+
+ public String lowerCaseName() {
+ return codecNameMap.get(this.name());
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 1377a03d93b..a1d6446cc10 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -21,19 +21,13 @@ import java.util.Locale
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress._
import org.apache.spark.util.Utils
object CompressionCodecs {
- private val shortCompressionCodecNames = Map(
- "none" -> null,
- "uncompressed" -> null,
- "bzip2" -> classOf[BZip2Codec].getName,
- "deflate" -> classOf[DeflateCodec].getName,
- "gzip" -> classOf[GzipCodec].getName,
- "lz4" -> classOf[Lz4Codec].getName,
- "snappy" -> classOf[SnappyCodec].getName)
+ private val shortCompressionCodecNames = HadoopCompressionCodec.values().map
{ codec =>
+ codec.lowerCaseName() ->
Option(codec.getCompressionCodec).map(_.getClass.getName).orNull
+ }.toMap
/**
* Return the full version of the given codec class.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b0a0b189cb7..d3271283baa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -40,6 +40,7 @@ import
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode,
LocalRelation, LogicalPlan, OneRowRelation, Statistics}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution,
SortExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -2766,7 +2767,8 @@ class DataFrameSuite extends QueryTest
// The data set has 2 partitions, so Spark will write at least 2 json
files.
// Use a non-splittable compression (gzip), to make sure the json scan
RDD has at least 2
// partitions.
- .write.partitionBy("p").option("compression",
"gzip").json(path.getCanonicalPath)
+ .write.partitionBy("p")
+ .option("compression",
GZIP.lowerCaseName()).json(path.getCanonicalPath)
val numJobs = new AtomicLong(0)
sparkContext.addSparkListener(new SparkListener {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 74043bac49a..ea90cd9cd09 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
@@ -91,12 +92,15 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
}
private def saveAsCsvTable(df: DataFrameWriter[Row], dir: String): Unit = {
- df.mode("overwrite").option("compression", "gzip").option("header",
true).csv(dir)
+ df.mode("overwrite")
+ .option("compression", GZIP.lowerCaseName())
+ .option("header", true)
+ .csv(dir)
spark.read.option("header",
true).csv(dir).createOrReplaceTempView("csvTable")
}
private def saveAsJsonTable(df: DataFrameWriter[Row], dir: String): Unit = {
- df.mode("overwrite").option("compression", "gzip").json(dir)
+ df.mode("overwrite").option("compression", GZIP.lowerCaseName()).json(dir)
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index a84aea27868..a2ce9b5db2a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -39,7 +39,7 @@ import org.apache.logging.log4j.Level
import org.apache.spark.{SparkConf, SparkException,
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException,
TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders,
QueryTest, Row}
import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils,
HadoopCompressionCodec}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.test.SharedSparkSession
@@ -874,7 +874,7 @@ abstract class CSVSuite
cars.coalesce(1).write
.format("csv")
.option("header", "true")
- .option("compression", "none")
+ .option("compression", HadoopCompressionCodec.NONE.lowerCaseName())
.options(extraOptions)
.save(csvDir)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2f8b0a323dc..d906ae80a80 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.{SparkConf, SparkException,
SparkFileNotFoundException,
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils,
HadoopCompressionCodec}
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite,
DataSource, InMemoryFileIndex, NoopCache}
@@ -1689,7 +1689,7 @@ abstract class JsonSuite
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
- .option("compression", "none")
+ .option("compression", HadoopCompressionCodec.NONE.lowerCaseName())
.options(extraOptions)
.save(jsonDir)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ff6b9aadf7c..6e3210f8c17 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row,
SaveMode}
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.{BZIP2,
DEFLATE, GZIP, NONE}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -92,7 +93,8 @@ abstract class TextSuite extends QueryTest with
SharedSparkSession with CommonFi
test("SPARK-13503 Support to specify the option for compression codec for
TEXT") {
val testDf = spark.read.text(testFile)
- val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate",
"gzip" -> ".gz")
+ val extensionNameMap = Seq(BZIP2, DEFLATE, GZIP)
+ .map(codec => codec.lowerCaseName() ->
codec.getCompressionCodec.getDefaultExtension)
extensionNameMap.foreach {
case (codecName, extension) =>
val tempDir = Utils.createTempDir()
@@ -122,7 +124,7 @@ abstract class TextSuite extends QueryTest with
SharedSparkSession with CommonFi
withTempDir { dir =>
val testDf = spark.read.text(testFile)
val tempDirPath = dir.getAbsolutePath
- testDf.write.option("compression", "none")
+ testDf.write.option("compression", NONE.lowerCaseName())
.options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
@@ -141,7 +143,7 @@ abstract class TextSuite extends QueryTest with
SharedSparkSession with CommonFi
withTempDir { dir =>
val testDf = spark.read.text(testFile)
val tempDirPath = dir.getAbsolutePath
- testDf.write.option("CoMpReSsIoN", "none")
+ testDf.write.option("CoMpReSsIoN", NONE.lowerCaseName())
.options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
@@ -166,7 +168,7 @@ abstract class TextSuite extends QueryTest with
SharedSparkSession with CommonFi
withTempDir { dir =>
val path = dir.getCanonicalPath
val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s")
- df1.write.option("compression", "gzip").mode("overwrite").text(path)
+ df1.write.option("compression",
GZIP.lowerCaseName()).mode("overwrite").text(path)
val expected = df1.collect()
Seq(10, 100, 1000).foreach { bytes =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
index f4812844cba..57e08c55874 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructType}
@@ -90,7 +91,7 @@ abstract class WholeTextFileSuite extends QueryTest with
SharedSparkSession {
withTempDir { dir =>
val path = dir.getCanonicalPath
val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS
s").repartition(1)
- df1.write.option("compression", "gzip").mode("overwrite").text(path)
+ df1.write.option("compression",
GZIP.lowerCaseName()).mode("overwrite").text(path)
// On reading through wholetext mode, one file will be read as a single
row, i.e. not
// delimited by "next line" character.
val expected = Row(df1.collect().map(_.getString(0)).mkString("", "\n",
"\n"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]