aokolnychyi commented on code in PR #9724: URL: https://github.com/apache/iceberg/pull/9724#discussion_r1498496206
########## api/src/main/java/org/apache/iceberg/DeleteFiles.java: ########## @@ -55,6 +55,17 @@ default DeleteFiles deleteFile(DataFile file) { return this; } + /** + * Delete a file tracked by a {@link DeleteFile} from the underlying table. + * + * @param file a DeleteFile to remove from the table + * @return this for method chaining + */ + default DeleteFiles deleteFile(DeleteFile file) { Review Comment: Are we sure this is the best API we can use? The new snapshot will be of the `DELETE` type, indicating data was removed from the table. Why not use `RewriteFiles` instead that already supports what we need? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -64,19 +65,25 @@ import org.apache.iceberg.spark.JobGroupUtils; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.source.SerializableTableWithSize; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class BaseSparkAction<ThisT> { + public static final String USE_CACHING = "use-caching"; Review Comment: This class isn't public so I don't think we should put public constants here. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -435,4 +442,25 @@ static FileInfo toFileInfo(ContentFile<?> file) { return new FileInfo(file.path().toString(), file.content().toString()); } } + + protected <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { Review Comment: Something is off here. This logic is probably borrowed from `RewriteManifestsSparkAction` but it actually has changed recently. This snippet is old. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java: ########## @@ -278,20 +277,6 @@ private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int numPart return df.repartitionByRange(numPartitions, col).sortWithinPartitions(col); } - private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) { Review Comment: I don't think this should be moved and used for this work. ########## api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java: ########## @@ -99,6 +100,24 @@ public interface RewriteDataFiles boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true; + /** + * Remove dangling delete files from current snapshot after compaction. A delete file is Review Comment: Minor: `from current snapshot` -> `from the current snapshot`. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -323,10 +350,117 @@ private Result doExecute( List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults); + } + + private List<DeleteFile> removeDanglingDeletes() { Review Comment: The logic is non-trivial here and is off by default. I'd probably move it into a separate action, the existing action is already complicated. If so, I am not sure we even have to call it from rewrite data files then. If we ask the user to pass a property explicitly, I'd prefer separating the two actions and have a dedicated procedure. ########## api/src/main/java/org/apache/iceberg/RemoveDanglingDeletesMode.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Enum of supported remove dangling deletes mode, it defines the mode in which the dangling delete + * files shall be pruned + * + * <p> + * + * <ul> + * <li>If remove-dangling-deletes=metadata, then dangling delete files will be pruned from iceberg + * metadata. Pruning apply to both position delete and equality delete files. + * <li>If remove-dangling-deletes=none, then pruning is disabled. + * </ul> + * + * <p> + */ +public enum RemoveDanglingDeletesMode { Review Comment: I am not sure we need this enum to be honest. The decision to use partition stats instead of scanning should be done by Iceberg, not users. If we detect there is a viable partition stats file, we should always use it, instead of scanning the metadata. Also, the `FULL` mode seems a bit awkward as it would actually rewrite deletes, rather than drop dangling. I'd not add it for now and see if we want to reconsider this decision later. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -323,10 +350,117 @@ private Result doExecute( List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults); + } + + private List<DeleteFile> removeDanglingDeletes() { + if (table.specs().size() == 1 && table.spec().isUnpartitioned()) { + // ManifestFilterManager already performs this table-wide on each commit + return Collections.emptyList(); + } + + DeleteFiles deleteFiles = table.newDelete(); + List<DeleteFile> toRemove = Lists.newArrayList(); + LOG.info("Evaluating dangling delete files for {}", table.name()); + Dataset<Row> entries = + loadMetadataTable(table, MetadataTableType.ENTRIES) + .filter("status < 2") // live entries + .selectExpr( + "data_file.partition as partition", + "data_file.spec_id as spec_id", + "data_file.file_path as file_path", + "data_file.content as content", + "data_file.file_size_in_bytes as file_size_in_bytes", + "data_file.record_count as record_count", + "sequence_number"); + + toRemove.addAll(withReusableDS(entries, this::danglingDeletes)); + toRemove.forEach(f -> LOG.debug("Removing dangling delete file {}", f.path())); Review Comment: Can we do a simple `for (DeleteFile deleteFile : danglingDeletes) {}` here instead of two passes? ########## api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java: ########## @@ -99,6 +100,24 @@ public interface RewriteDataFiles boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true; + /** + * Remove dangling delete files from current snapshot after compaction. A delete file is Review Comment: Are we looking at data sequence numbers here? If so, we have to be specific in the comment. Let me check the implementation and come back. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -323,10 +350,117 @@ private Result doExecute( List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); + return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults); + } + + private List<DeleteFile> removeDanglingDeletes() { + if (table.specs().size() == 1 && table.spec().isUnpartitioned()) { + // ManifestFilterManager already performs this table-wide on each commit + return Collections.emptyList(); + } + + DeleteFiles deleteFiles = table.newDelete(); + List<DeleteFile> toRemove = Lists.newArrayList(); + LOG.info("Evaluating dangling delete files for {}", table.name()); + Dataset<Row> entries = + loadMetadataTable(table, MetadataTableType.ENTRIES) + .filter("status < 2") // live entries + .selectExpr( + "data_file.partition as partition", + "data_file.spec_id as spec_id", + "data_file.file_path as file_path", + "data_file.content as content", + "data_file.file_size_in_bytes as file_size_in_bytes", + "data_file.record_count as record_count", + "sequence_number"); + + toRemove.addAll(withReusableDS(entries, this::danglingDeletes)); + toRemove.forEach(f -> LOG.debug("Removing dangling delete file {}", f.path())); + toRemove.forEach(deleteFiles::deleteFile); + + if (!toRemove.isEmpty()) { + commit(deleteFiles); + } + + return toRemove; + } + + private List<DeleteFile> danglingDeletes(Dataset<Row> entries) { + List<DeleteFile> removedDeleteFiles = Lists.newArrayList(); + + // Minimum sequence number of data files in each partition + Dataset<Row> minDataSeqNumberPerPartition = + entries + .filter("content == 0") // data files + .groupBy("partition", "spec_id") + .agg(min("sequence_number")) + .toDF("partition", "spec_id", "min_data_sequence_number") + .cache(); + + // Dangling position delete files + removedDeleteFiles.addAll(danglingPositionDeletes(entries, minDataSeqNumberPerPartition)); Review Comment: Why do we have to deal with equality and position deletes separately? Can't we check the content type in the filter? The join condition will be the same, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org