aokolnychyi commented on code in PR #6682: URL: https://github.com/apache/iceberg/pull/6682#discussion_r1121109446
########## core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java: ########## @@ -149,6 +165,45 @@ public void deletePrefix(String prefix) { } } + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + AtomicInteger failureCount = new AtomicInteger(0); + Tasks.foreach(pathsToDelete) + .executeWith(executorService()) + .retry(3) + .stopRetryOn(FileNotFoundException.class) + .suppressFailureWhenFinished() + .onFailure( + (f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); Review Comment: This is going to increment the count on each failed attempt and won't be accurate. We could count the number of successfully deleted files instead and then use `Iterables.size(pathsToDelete)` to find how many we were supposed to delete. ########## core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java: ########## @@ -149,6 +165,45 @@ public void deletePrefix(String prefix) { } } + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + AtomicInteger failureCount = new AtomicInteger(0); + Tasks.foreach(pathsToDelete) + .executeWith(executorService()) + .retry(3) Review Comment: nit: Shall we define a constant for this too? ########## api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java: ########## @@ -80,11 +80,14 @@ public interface ExpireSnapshots extends Action<ExpireSnapshots, ExpireSnapshots ExpireSnapshots deleteWith(Consumer<String> deleteFunc); /** - * Passes an alternative executor service that will be used for manifests, data and delete files - * deletion. + * Passes an alternative executor service that will be used for files removal. This service will + * only be used if a custom delete function is provided by {@link #deleteWith(Consumer)} or if the + * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. Review Comment: nit: Shall we import `SupportsBulkOperations` directly like in the two classes earlier? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + + private static void deleteFileGroup( + List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap<String, FileInfo> filesByType = + Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap<String, String> pathsByType = + Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry<String, Collection<String>> entry : pathsByType.asMap().entrySet()) { + String type = entry.getKey(); + Collection<String> paths = entry.getValue(); + int failures = 0; + try { + io.deleteFiles(paths); + } catch (BulkDeletionFailureException bulkDeletionFailureException) { Review Comment: nit: Is there value in such a long exception name compared to `e`? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; Review Comment: nit: What about an empty line before if/else blocks? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { Review Comment: Can we invert the condition as it is hard to interpret negation? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java: ########## @@ -98,6 +101,13 @@ public InternalRow[] call(InternalRow args) { String location = args.isNullAt(2) ? null : args.getString(2); boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); + if (maxConcurrentDeletes != null) { Review Comment: Same here. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java: ########## @@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() { List<String> orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); + + } else { + if (deleteFunc == null) { + LOG.info("Bulk Deletes are not Supported by {}, using non-bulk deletes", table.io()); + deleteFunc = defaultDelete; Review Comment: I don't think it is a good idea to init this variable here. I summarized my thoughts in other actions below. One way to avoid this: ``` if (deleteFunc == null) { LOG.info("Table IO does not support bulk operations, using non-bulk deletes"); } else { LOG.info("Custom delete function provided, using non-bulk deletes"); } Tasks.foreach(orphanFiles) .noRetry() ... .run(deleteFunc != null ? deleteFunc::accept : table.io()::deleteFile); ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java: ########## @@ -125,7 +126,7 @@ public void accept(String file) { private String location = null; private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); private Dataset<Row> compareToFileList; - private Consumer<String> deleteFunc = defaultDelete; Review Comment: I'd consider getting rid of `defaultDelete` and using `table.io()::deleteFile` explicitly. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + + private static void deleteFileGroup( + List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap<String, FileInfo> filesByType = Review Comment: nit: Just `ListMultimap` instead of `ImmutableListMultimap`? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + + private static void deleteFileGroup( + List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap<String, FileInfo> filesByType = + Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap<String, String> pathsByType = + Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry<String, Collection<String>> entry : pathsByType.asMap().entrySet()) { Review Comment: These multi maps calls are pretty tricky and separating them out seems like a good idea. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + + private static void deleteFileGroup( + List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap<String, FileInfo> filesByType = + Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap<String, String> pathsByType = + Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry<String, Collection<String>> entry : pathsByType.asMap().entrySet()) { Review Comment: nit: What about adding empty lines after method args and prior to the for loop? ``` private static void deleteFileGroup( List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { ListMultimap<String, FileInfo> filesByType = Multimaps.index(fileGroup, FileInfo::getType); ListMultimap<String, String> pathsByType = Multimaps.transformValues(filesByType, FileInfo::getPath); for (Map.Entry<String, Collection<String>> entry : pathsByType.asMap().entrySet()) { ... } } ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); + + return summary; + } + + private static void deleteFileGroup( + List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary summary) { + ImmutableListMultimap<String, FileInfo> filesByType = + Multimaps.index(fileGroup, FileInfo::getType); + ListMultimap<String, String> pathsByType = + Multimaps.transformValues(filesByType, FileInfo::getPath); + for (Map.Entry<String, Collection<String>> entry : pathsByType.asMap().entrySet()) { + String type = entry.getKey(); + Collection<String> paths = entry.getValue(); + int failures = 0; + try { + io.deleteFiles(paths); + } catch (BulkDeletionFailureException bulkDeletionFailureException) { + failures = bulkDeletionFailureException.numberFailedObjects(); + } + summary.deletedFiles(entry.getKey(), paths.size() - failures); Review Comment: Shall we use `type` variable defined above? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + deleteFunc = defaultDelete; Review Comment: If we follow the snippet above, we can remove `defaultDelete` completely. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java: ########## @@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles( return summary; } + protected DeleteSummary deleteFiles(SupportsBulkOperations io, Iterator<FileInfo> files) { + DeleteSummary summary = new DeleteSummary(); + Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE); + + Tasks.foreach(fileGroups) + .suppressFailureWhenFinished() + .run(fileGroup -> deleteFileGroup(fileGroup, io, summary)); + + LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount()); Review Comment: I think this log message is redundant as each place that calls it logs the result afterwards. Each caller also logs whether it is going to use bulk deletes or not. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -54,6 +55,8 @@ private static final Logger LOG = LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class); private final String metadataFileLocation; + + @Deprecated private final Consumer<String> defaultDelete = Review Comment: The code still uses this var even though it is marked as deprecated. Can we simply remove it? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); Review Comment: nit: Why capital letter for some words? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java: ########## @@ -93,6 +97,12 @@ public InternalRow[] call(InternalRow args) { Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2); Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3); + if (maxConcurrentDeletes != null) { + LOG.warn( Review Comment: I don't think this description is accurate. I believe this property will have no impact now. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java: ########## @@ -265,7 +266,20 @@ private Set<Long> findExpiredSnapshotIds( } private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) { Review Comment: Same comments as in `DeleteReachableFilesSparkAction`. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java: ########## @@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() { List<String> orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); Review Comment: Unlike non-bulk deletes, this may fail the entire action if there is a bulk delete exception. I think we will have to add a helper method like this or similar? ``` private void deleteFiles(SupportsBulkOperations io, List<String> paths) { try { io.deleteFiles(paths); LOG.info("Deleted {} files using bulk deletes", paths.size()); } catch (BulkDeletionFailureException e) { int deletedFilesCount = paths.size() - e.numberFailedObjects(); LOG.warn( "Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size()); } } ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java: ########## @@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() { List<String> orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); + + } else { + if (deleteFunc == null) { + LOG.info("Bulk Deletes are not Supported by {}, using non-bulk deletes", table.io()); Review Comment: nit: Can we align log messages with other actions? ########## core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java: ########## @@ -149,6 +164,46 @@ public void deletePrefix(String prefix) { } } + @Override + public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { + AtomicInteger failureCount = new AtomicInteger(0); + Tasks.foreach(pathsToDelete) + .executeWith(executorService()) + .retry(3) + .stopRetryOn(FileNotFoundException.class) + .suppressFailureWhenFinished() + .onFailure( + (f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); + }) + .run(this::deleteFile); + + if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); + } + } + + private int deleteThreads() { + return conf() Review Comment: I still think having an extra variable here would make it a bit more readable. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java: ########## @@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) { } private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) { - DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files); + DeleteSummary summary; + if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) { + if (deleteFunc == null) { + LOG.info("Table IO does not support Bulk Operations. Using non-bulk deletes."); + deleteFunc = defaultDelete; Review Comment: Hm, it seems a bit odd to init this variable here. Also, setting this will change the behavior if the action is called multiple times. Can we pass delete func explicitly? ``` if (deleteFunc == null && io instanceof SupportsBulkOperations) { LOG.info("Table IO supports bulk operations, using bulk deletes"); summary = deleteFiles((SupportsBulkOperations) io, files); } else { if (deleteFunc == null) { LOG.info("Table IO does not support bulk operations, using non-bulk deletes"); summary = deleteFiles(deleteExecutorService, io::deleteFile, files); } else { LOG.info("Custom delete function provided, using non-bulk deletes"); summary = deleteFiles(deleteExecutorService, deleteFunc, files); } } ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java: ########## @@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() { List<String> orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)) - .run(deleteFunc::accept); + if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles); Review Comment: Logging the number of removed files like above may help debugging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org