This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d24c7180b39e [SPARK-52574][SQL][TESTS] Ensure compression codec is
correctly applied in Hive tables and dirs
d24c7180b39e is described below
commit d24c7180b39ea73cbbc16a3e17961cfeb8848731
Author: Yuming Wang <[email protected]>
AuthorDate: Wed Jun 25 10:28:49 2025 -0700
[SPARK-52574][SQL][TESTS] Ensure compression codec is correctly applied in
Hive tables and dirs
### What changes were proposed in this pull request?
This PR adds a test to verify that the compression codec specified in
`spark.sql.parquet.compression.codec` is correctly applied to Hive tables and
directories during table creation and data insertion.
### Why are the changes needed?
We add compression codec through `setupHadoopConfForCompression` and keep
compress codec in hadoopConf:
https://github.com/apache/spark/blob/3e0808c33f185c13808ce2d547ce9ba0057d31a6/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala#L114-L141
However, we might accidentally use
`sparkSession.sessionState.newHadoopConf()` instead of the old hadoopConf. This
can cause the compression codec to be lost. So, we add a test to make sure this
does not happen.
Keep compression codec | Lost compression codec
-- | --
<img width="747" alt="image"
src="https://github.com/user-attachments/assets/67c5c3c6-a265-48c8-a77b-5d2db71af1a0"
/> | <img width="768" alt="image"
src="https://github.com/user-attachments/assets/71dcab6f-4237-4515-b942-e3531fb08899"
/>
</body>
</html>
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51279 from wangyum/SPARK-52574.
Lead-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/hive/HiveParquetSuite.scala | 42 ++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 6d7248a7dd67..8fd37234f08e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -17,10 +17,15 @@
package org.apache.spark.sql.hive
+import java.io.File
import java.time.{Duration, Period}
import java.time.temporal.ChronoUnit
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
ParquetTest}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -179,4 +184,41 @@ class HiveParquetSuite extends QueryTest
}
}
}
+
+ test("SPARK-52574: Ensure compression codec is correctly applied in Hive
tables and dirs") {
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+ HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false",
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(id int) STORED AS PARQUET")
+ sql("INSERT INTO tbl SELECT id AS part FROM range(10)")
+ val tblMata =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
+ checkCompressionCodec(new File(tblMata.storage.locationUri.get))
+ }
+
+ withTempPath { dir =>
+ sql(
+ s"""
+ |INSERT OVERWRITE LOCAL DIRECTORY '${dir.getCanonicalPath}'
+ |STORED AS parquet
+ |SELECT id FROM range(10)
+ |""".stripMargin)
+ checkCompressionCodec(dir)
+ }
+ }
+
+ def checkCompressionCodec(dir: File): Unit = {
+ val parquetFiles = dir.listFiles().filter(_.getName.startsWith("part-"))
+ assert(parquetFiles.nonEmpty, "No Parquet files found")
+
+ val conf = spark.sessionState.newHadoopConf()
+ val file = parquetFiles.head
+ val footer = ParquetFileReader.readFooter(conf, new
Path(file.getAbsolutePath))
+
+ val codec = footer.getBlocks.get(0).getColumns.get(0).getCodec.name()
+
assert(codec.equalsIgnoreCase(ParquetCompressionCodec.SNAPPY.lowerCaseName()),
+ s"Expected ${ParquetCompressionCodec.SNAPPY.lowerCaseName()}
compression but found $codec")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]