huaxingao commented on code in PR #11257: URL: https://github.com/apache/iceberg/pull/11257#discussion_r1794322243
########## spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java: ########## @@ -108,14 +108,14 @@ public static Object[][] parameters() { SparkCatalogConfig.SPARK.implementation(), SparkCatalogConfig.SPARK.properties(), PARQUET, - ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1") Review Comment: The reason I wanted to `ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")` after gzip is that the new Hadoop version uses `CompressionLevel` to initialize a GzipCompressor, and this COMPRESSION_LEVEL, "1", is carried over to gzip. However, "1" is not a valid compression level for gzip, so it throws an exception. ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 7) (192.168.50.141 executor driver): java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.1 at java.base/java.lang.Enum.valueOf(Enum.java:273) at org.apache.hadoop.conf.Configuration.getEnum(Configuration.java:1786) at org.apache.hadoop.io.compress.zlib.ZlibFactory.getCompressionLevel(ZlibFactory.java:165) at org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.init(BuiltInGzipCompressor.java:157) at org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.<init>(BuiltInGzipCompressor.java:67) at org.apache.hadoop.io.compress.GzipCodec.createCompressor(GzipCodec.java:64) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:157) at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219) at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202) at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90) at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360) ``` I think this over; rather than switching the order, it's better to unset the COMPRESSION_CODEC and COMPRESSION_LEVEL for each test. ########## spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java: ########## @@ -302,4 +303,12 @@ private <T> MemoryStream<T> newMemoryStream(int id, SQLContext sqlContext, Encod private <T> void send(List<T> records, MemoryStream<T> stream) { stream.addData(JavaConverters.asScalaBuffer(records)); } + + private void deleteFileAndCrc(File file) throws IOException { + File crcFile = new File(file.getParent(), "." + file.getName() + ".crc"); Review Comment: file.getAbsolutePath + ".crc" has ``` /var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/1.crc ``` but I need ``` /var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit3203001525697015981/parquet/checkpoint/commits/.1.crc ``` When deleting the commits like ``` // remove the last commit to force Spark to reprocess batch #1 File lastCommitFile = new File(checkpoint.toString() + "/commits/1"); assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue(); ``` I got Exception so I manually delete the crc file ``` Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Rename destination file:/var/folders/lp/nhx1yjj90qb54xq7v_grpl9c0000gn/T/junit16535101756839742517/parquet/checkpoint/commits/.1.crc already exists. at app//org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:876) at app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807) at app//org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:519) at app//org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807) at app//org.apache.hadoop.fs.FileContext.rename(FileContext.java:1044) at app//org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:376) at app//org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:156) at app//org.apache.spark.sql.execution.streaming.HDFSMetadataLog.write(HDFSMetadataLog.scala:207) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org