This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d24c7180b39e [SPARK-52574][SQL][TESTS] Ensure compression codec is 
correctly applied in Hive tables and dirs
d24c7180b39e is described below

commit d24c7180b39ea73cbbc16a3e17961cfeb8848731
Author: Yuming Wang <[email protected]>
AuthorDate: Wed Jun 25 10:28:49 2025 -0700

    [SPARK-52574][SQL][TESTS] Ensure compression codec is correctly applied in 
Hive tables and dirs
    
    ### What changes were proposed in this pull request?
    
    This PR adds a test to verify that the compression codec specified in 
`spark.sql.parquet.compression.codec` is correctly applied to Hive tables and 
directories during table creation and data insertion.
    
    ### Why are the changes needed?
    
    We add compression codec through `setupHadoopConfForCompression` and keep 
compress codec in hadoopConf:
    
https://github.com/apache/spark/blob/3e0808c33f185c13808ce2d547ce9ba0057d31a6/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala#L114-L141
    
    However, we might accidentally use 
`sparkSession.sessionState.newHadoopConf()` instead of the old hadoopConf. This 
can cause the compression codec to be lost. So, we add a test to make sure this 
does not happen.
    
    Keep compression   codec | Lost compression codec
    -- | --
    <img width="747" alt="image" 
src="https://github.com/user-attachments/assets/67c5c3c6-a265-48c8-a77b-5d2db71af1a0";
 /> | <img width="768" alt="image" 
src="https://github.com/user-attachments/assets/71dcab6f-4237-4515-b942-e3531fb08899";
 />
    
    </body>
    
    </html>
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51279 from wangyum/SPARK-52574.
    
    Lead-authored-by: Yuming Wang <[email protected]>
    Co-authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/hive/HiveParquetSuite.scala   | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 6d7248a7dd67..8fd37234f08e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
 import java.time.{Duration, Period}
 import java.time.temporal.ChronoUnit
 
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
ParquetTest}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -179,4 +184,41 @@ class HiveParquetSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-52574: Ensure compression codec is correctly applied in Hive 
tables and dirs") {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false",
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
+      withTable("tbl") {
+        sql("CREATE TABLE tbl(id int) STORED AS PARQUET")
+        sql("INSERT INTO tbl SELECT id AS part FROM range(10)")
+        val tblMata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
+        checkCompressionCodec(new File(tblMata.storage.locationUri.get))
+      }
+
+      withTempPath { dir =>
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY '${dir.getCanonicalPath}'
+             |STORED AS parquet
+             |SELECT id FROM range(10)
+             |""".stripMargin)
+        checkCompressionCodec(dir)
+      }
+    }
+
+    def checkCompressionCodec(dir: File): Unit = {
+      val parquetFiles = dir.listFiles().filter(_.getName.startsWith("part-"))
+      assert(parquetFiles.nonEmpty, "No Parquet files found")
+
+      val conf = spark.sessionState.newHadoopConf()
+      val file = parquetFiles.head
+      val footer = ParquetFileReader.readFooter(conf, new 
Path(file.getAbsolutePath))
+
+      val codec = footer.getBlocks.get(0).getColumns.get(0).getCodec.name()
+      
assert(codec.equalsIgnoreCase(ParquetCompressionCodec.SNAPPY.lowerCaseName()),
+        s"Expected ${ParquetCompressionCodec.SNAPPY.lowerCaseName()} 
compression but found $codec")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to