wypoon commented on code in PR #6026: URL: https://github.com/apache/iceberg/pull/6026#discussion_r1003542296
########## spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java: ########## @@ -508,6 +530,75 @@ public void testIsDeletedColumnWithoutDeleteFile() { checkDeleteCount(0L); } + @Test + public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { + Assume.assumeTrue(format.equals("parquet")); + + String tblName = "test3"; + Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); + + List<Path> fileSplits = Lists.newArrayList(); + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + Configuration conf = new Configuration(); + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + + // Write a Parquet file with more than one row group + ParquetFileWriter parquetFileWriter = + new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); + parquetFileWriter.start(); + for (int i = 0; i < 2; i += 1) { + File split = temp.newFile(); + Assert.assertTrue("Delete should succeed", split.delete()); + Path splitPath = new Path(split.getAbsolutePath()); + fileSplits.add(splitPath); + try (FileAppender<InternalRow> writer = + Parquet.write(Files.localOutput(split)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType)) + .schema(SCHEMA) + .overwrite() + .build()) { + Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37); + writer.addAll(records); + } + parquetFileWriter.appendFile( + org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf)); + } + parquetFileWriter.end( + ParquetFileWriter.mergeMetadataFiles(fileSplits, conf) + .getFileMetaData() + .getKeyValueMetaData()); + + // Add the file to the table + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf)) + .withFormat("parquet") + .withRecordCount(200) + .build(); + tbl.newAppend().appendFile(dataFile).commit(); + + // Add positional deletes to the table + List<Pair<CharSequence, Long>> deletes = + Lists.newArrayList( + Pair.of(dataFile.path(), 97L), + Pair.of(dataFile.path(), 98L), + Pair.of(dataFile.path(), 99L), + Pair.of(dataFile.path(), 101L), + Pair.of(dataFile.path(), 103L), + Pair.of(dataFile.path(), 107L), + Pair.of(dataFile.path(), 109L)); + Pair<DeleteFile, CharSequenceSet> posDeletes = + FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + tbl.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); Review Comment: Without the fix, this assertion fails for the vectorized case. There are 3 deletes applied to the first row group and 4 deletes applied to the second row group. Without the fix, the 3 deletes for the first row group are applied to the second as well (instead of the 4 that should be applied). Thus 6 rows are deleted (instead of 7) and the result is 194 rows, instead of the expected 193. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org