huaxingao commented on code in PR #11257:
URL: https://github.com/apache/iceberg/pull/11257#discussion_r1794322243


##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java:
##########
@@ -108,14 +108,14 @@ public static Object[][] parameters() {
         SparkCatalogConfig.SPARK.implementation(),
         SparkCatalogConfig.SPARK.properties(),
         PARQUET,
-        ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")

Review Comment:
   The reason I wanted to `ImmutableMap.of(COMPRESSION_CODEC, "zstd", 
COMPRESSION_LEVEL, "1")` after gzip is that the new Hadoop version uses 
`CompressionLevel` to initialize a GzipCompressor, and this COMPRESSION_LEVEL, 
"1", is carried over to gzip. However, "1" is not a valid compression level for 
gzip, so it throws an exception.
   ```
       org.apache.spark.SparkException: Job aborted due to stage failure: Task 
1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 
(TID 7) (192.168.50.141 executor driver): java.lang.IllegalArgumentException: 
No enum constant 
org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.1
        at java.base/java.lang.Enum.valueOf(Enum.java:273)
        at org.apache.hadoop.conf.Configuration.getEnum(Configuration.java:1786)
        at 
org.apache.hadoop.io.compress.zlib.ZlibFactory.getCompressionLevel(ZlibFactory.java:165)
        at 
org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.init(BuiltInGzipCompressor.java:157)
        at 
org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.<init>(BuiltInGzipCompressor.java:67)
        at 
org.apache.hadoop.io.compress.GzipCodec.createCompressor(GzipCodec.java:64)
        at 
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
        at 
org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
        at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:157)
        at 
org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
        at 
org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
        at 
org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
        at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)
   ```
   I think this over; rather than switching the order, it's better to unset the 
COMPRESSION_CODEC and COMPRESSION_LEVEL for each test.



##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java:
##########
@@ -302,4 +303,12 @@ private <T> MemoryStream<T> newMemoryStream(int id, 
SQLContext sqlContext, Encod
   private <T> void send(List<T> records, MemoryStream<T> stream) {
     stream.addData(JavaConverters.asScalaBuffer(records));
   }
+
+  private void deleteFileAndCrc(File file) throws IOException {
+    File crcFile = new File(file.getParent(), "." + file.getName() + ".crc");

Review Comment:
   file.getAbsolutePath + ".crc" has
   ```
   
/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/1.crc
   ```
   but I need 
   ```
   
/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/.1.crc
   ```
   
   When deleting the commits like 
   ```
         // remove the last commit to force Spark to reprocess batch #1
         File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
         assertThat(lastCommitFile.delete()).as("The commit file must be 
deleted").isTrue();
   ```
   I got Exception so I manually delete the crc file
   ```
    Caused by:
               org.apache.hadoop.fs.FileAlreadyExistsException: Rename 
destination 
file:/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit16535101756839742517/parquet/checkpoint/commits/.1.crc
 already exists.
                   at 
app//org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:876)
                   at 
app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
                   at 
app//org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:519)
                   at 
app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
                   at 
app//org.apache.hadoop.fs.FileContext.rename(FileContext.java:1044)
                   at 
app//org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:376)
                   at 
app//org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:156)
                   at 
app//org.apache.spark.sql.execution.streaming.HDFSMetadataLog.write(HDFSMetadataLog.scala:207)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to