ajantha-bhat commented on code in PR #9020: URL: https://github.com/apache/iceberg/pull/9020#discussion_r1388776171
########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java: ########## @@ -649,6 +659,243 @@ public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws IOExce assertThat(manifests).hasSizeGreaterThanOrEqualTo(2); } + @Test + public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws IOException { + assumeThat(formatVersion).isGreaterThan(1); + + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map<String, String> options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + // commit data records + List<ThreeColumnRecord> records = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD")); + writeRecords(records); + + // commit a position delete file to remove records where c1 = 1 OR c1 = 2 + List<Pair<CharSequence, Long>> posDeletes = generatePosDeletes("c1 = 1 OR c1 = 2"); + Pair<DeleteFile, CharSequenceSet> posDeleteWriteResult = writePosDeletes(table, posDeletes); + table + .newRowDelta() + .addDeletes(posDeleteWriteResult.first()) + .validateDataFilesExist(posDeleteWriteResult.second()) + .commit(); + + // commit an equality delete file to remove all records where c1 = 3 + DeleteFile eqDeleteFile = writeEqDeletes(table, "c1", 3); + table.newRowDelta().addDeletes(eqDeleteFile).commit(); + + // the current snapshot should contain 1 data manifest and 2 delete manifests + List<ManifestFile> originalManifests = table.currentSnapshot().allManifests(table.io()); + assertThat(originalManifests).hasSize(3); + + SparkActions actions = SparkActions.get(); + + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + // the original delete manifests must be combined + assertThat(result.rewrittenManifests()).hasSize(2); Review Comment: should we also validate that these manifests type is delete? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java: ########## @@ -264,14 +293,15 @@ private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { } } - private List<ManifestFile> findMatchingManifests() { + private List<ManifestFile> findMatchingManifests(ManifestContent content) { Snapshot currentSnapshot = table.currentSnapshot(); if (currentSnapshot == null) { return ImmutableList.of(); } - return currentSnapshot.dataManifests(table.io()).stream() + return currentSnapshot.allManifests(table.io()).stream() + .filter(manifest -> manifest.content() == content) Review Comment: nit: we can combine the filter conditions with below one? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java: ########## @@ -159,34 +169,49 @@ public RewriteManifests.Result execute() { } private RewriteManifests.Result doExecute() { - List<ManifestFile> matchingManifests = findMatchingManifests(); + List<ManifestFile> rewrittenManifests = Lists.newArrayList(); + List<ManifestFile> addedManifests = Lists.newArrayList(); + + Result dataResult = rewriteManifests(ManifestContent.DATA); + Iterables.addAll(rewrittenManifests, dataResult.rewrittenManifests()); + Iterables.addAll(addedManifests, dataResult.addedManifests()); + + Result deletesResult = rewriteManifests(ManifestContent.DELETES); + Iterables.addAll(rewrittenManifests, deletesResult.rewrittenManifests()); + Iterables.addAll(addedManifests, deletesResult.addedManifests()); + + if (rewrittenManifests.isEmpty() && addedManifests.isEmpty()) { Review Comment: Functionally it is fine. It is checking whether calling `rewriteManifests` for both data and delete has returned EMPTY_RESULT. If so, method is retuning empty result. But is just checking `rewrittenManifests.isEmpty()` is enough (instead of both)? Because how can we add a new manifest when rewrittenManifests are empty? ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkContentFiles.java: ########## @@ -161,40 +173,132 @@ private void checkSparkDataFile(Table table) throws IOException { table.refresh(); + PartitionSpec dataFilesSpec = table.spec(); + List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifest", 1, manifests.size()); List<DataFile> dataFiles = Lists.newArrayList(); try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) { for (DataFile dataFile : reader) { - checkDataFile(dataFile.copy(), DataFiles.builder(table.spec()).copy(dataFile).build()); + checkDataFile(dataFile.copy(), DataFiles.builder(dataFilesSpec).copy(dataFile).build()); dataFiles.add(dataFile.copy()); } } - Dataset<Row> dataFileDF = spark.read().format("iceberg").load(tableLocation + "#files"); + UpdatePartitionSpec updateSpec = table.updateSpec(); + for (PartitionField field : dataFilesSpec.fields()) { + updateSpec.removeField(field.name()); + } + updateSpec.commit(); - // reorder columns to test arbitrary projections - List<Column> columns = - Arrays.stream(dataFileDF.columns()).map(ColumnName::new).collect(Collectors.toList()); - Collections.shuffle(columns); + List<DeleteFile> positionDeleteFiles = Lists.newArrayList(); + List<DeleteFile> equalityDeleteFiles = Lists.newArrayList(); + + RowDelta rowDelta = table.newRowDelta(); + + for (DataFile dataFile : dataFiles) { + DeleteFile positionDeleteFile = createPositionDeleteFile(table, dataFile); + positionDeleteFiles.add(positionDeleteFile); + rowDelta.addDeletes(positionDeleteFile); + } - List<Row> sparkDataFiles = - dataFileDF.select(Iterables.toArray(columns, Column.class)).collectAsList(); + DeleteFile equalityDeleteFile1 = createEqualityDeleteFile(table); + equalityDeleteFiles.add(equalityDeleteFile1); + rowDelta.addDeletes(equalityDeleteFile1); + DeleteFile equalityDeleteFile2 = createEqualityDeleteFile(table); + equalityDeleteFiles.add(equalityDeleteFile2); + rowDelta.addDeletes(equalityDeleteFile2); + + rowDelta.commit(); + + Dataset<Row> dataFileDF = spark.read().format("iceberg").load(tableLocation + "#data_files"); + List<Row> sparkDataFiles = shuffleColumns(dataFileDF).collectAsList(); Assert.assertEquals( "The number of files should match", dataFiles.size(), sparkDataFiles.size()); - Types.StructType dataFileType = DataFile.getType(table.spec().partitionType()); + Types.StructType dataFileType = DataFile.getType(dataFilesSpec.partitionType()); StructType sparkDataFileType = sparkDataFiles.get(0).schema(); - SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkDataFileType); + SparkDataFile dataFileWrapper = new SparkDataFile(dataFileType, sparkDataFileType); for (int i = 0; i < dataFiles.size(); i++) { - checkDataFile(dataFiles.get(i), wrapper.wrap(sparkDataFiles.get(i))); + checkDataFile(dataFiles.get(i), dataFileWrapper.wrap(sparkDataFiles.get(i))); + } + + Dataset<Row> positionDeleteFileDF = + spark.read().format("iceberg").load(tableLocation + "#delete_files").where("content = 1"); + List<Row> sparkPositionDeleteFiles = shuffleColumns(positionDeleteFileDF).collectAsList(); + Assert.assertEquals( + "The number of files should match", + positionDeleteFiles.size(), + sparkPositionDeleteFiles.size()); + + Types.StructType positionDeleteFileType = DataFile.getType(dataFilesSpec.partitionType()); + StructType sparkPositionDeleteFileType = sparkPositionDeleteFiles.get(0).schema(); + SparkDeleteFile positionDeleteFileWrapper = + new SparkDeleteFile(positionDeleteFileType, sparkPositionDeleteFileType); + + for (int i = 0; i < positionDeleteFiles.size(); i++) { + checkDeleteFile( + positionDeleteFiles.get(i), + positionDeleteFileWrapper.wrap(sparkPositionDeleteFiles.get(i))); + } + + Dataset<Row> equalityDeleteFileDF = + spark.read().format("iceberg").load(tableLocation + "#delete_files").where("content = 2"); + List<Row> sparkEqualityDeleteFiles = shuffleColumns(equalityDeleteFileDF).collectAsList(); + Assert.assertEquals( + "The number of files should match", + equalityDeleteFiles.size(), + sparkEqualityDeleteFiles.size()); + + Types.StructType equalityDeleteFileType = DataFile.getType(table.spec().partitionType()); + StructType sparkEqualityDeleteFileType = sparkEqualityDeleteFiles.get(0).schema(); + SparkDeleteFile equalityDeleteFileWrapper = + new SparkDeleteFile(equalityDeleteFileType, sparkEqualityDeleteFileType); + + for (int i = 0; i < equalityDeleteFiles.size(); i++) { + checkDeleteFile( + equalityDeleteFiles.get(i), + equalityDeleteFileWrapper.wrap(sparkEqualityDeleteFiles.get(i))); } } + private Dataset<Row> shuffleColumns(Dataset<Row> df) { + List<Column> columns = + Arrays.stream(df.columns()).map(ColumnName::new).collect(Collectors.toList()); + Collections.shuffle(columns); + return df.select(columns.toArray(new Column[0])); + } + private void checkDataFile(DataFile expected, DataFile actual) { + Assert.assertEquals("Content must match", expected.content(), actual.content()); Review Comment: nit: we can have `checkContentFile` and extract common assertions into that ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java: ########## @@ -215,41 +240,45 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) { .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file"); } - private List<ManifestFile> writeManifestsForUnpartitionedTable( - Dataset<Row> manifestEntryDF, int numManifests) { - - StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); - Types.StructType combinedPartitionType = Partitioning.partitionType(table); - Types.StructType partitionType = spec.partitionType(); + private List<ManifestFile> writeUnpartitionedManifests( + ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) { - return manifestEntryDF - .repartition(numManifests) - .mapPartitions( - toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType), - manifestEncoder) - .collectAsList(); + WriteManifests<?> writeFunc = newWriteManifestsFunc(content, manifestEntryDF.schema()); + Dataset<Row> transformedManifestEntryDF = manifestEntryDF.repartition(numManifests); + return writeFunc.apply(transformedManifestEntryDF).collectAsList(); } - private List<ManifestFile> writeManifestsForPartitionedTable( - Dataset<Row> manifestEntryDF, int numManifests) { - - StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); - Types.StructType combinedPartitionType = Partitioning.partitionType(table); - Types.StructType partitionType = spec.partitionType(); + private List<ManifestFile> writePartitionedManifests( + ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) { return withReusableDS( manifestEntryDF, df -> { + WriteManifests<?> writeFunc = newWriteManifestsFunc(content, df.schema()); Column partitionColumn = df.col("data_file.partition"); - return df.repartitionByRange(numManifests, partitionColumn) - .sortWithinPartitions(partitionColumn) - .mapPartitions( - toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType), - manifestEncoder) - .collectAsList(); + Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, numManifests); + return writeFunc.apply(transformedDF).collectAsList(); }); } + private WriteManifests<?> newWriteManifestsFunc(ManifestContent content, StructType sparkType) { + ManifestWriterFactory writers = manifestWriters(); + + StructType sparkFileType = (StructType) sparkType.apply("data_file").dataType(); Review Comment: I expected a check based on `content` type and use `delete_file` and `data_file` instead of using `data_file` always. But looks like in schema we always keep it as `data_file` even for the delete file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org