aokolnychyi commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1121109446


##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +165,45 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)
+        .suppressFailureWhenFinished()
+        .onFailure(
+            (f, e) -> {
+              LOG.error("Failure during bulk delete on file: {} ", f, e);
+              failureCount.incrementAndGet();

Review Comment:
   This is going to increment the count on each failed attempt and won't be 
accurate. We could count the number of successfully deleted files instead and 
then use `Iterables.size(pathsToDelete)` to find how many we were supposed to 
delete.



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +165,45 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)

Review Comment:
   nit: Shall we define a constant for this too?



##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -80,11 +80,14 @@ public interface ExpireSnapshots extends 
Action<ExpireSnapshots, ExpireSnapshots
   ExpireSnapshots deleteWith(Consumer<String> deleteFunc);
 
   /**
-   * Passes an alternative executor service that will be used for manifests, 
data and delete files
-   * deletion.
+   * Passes an alternative executor service that will be used for files 
removal. This service will
+   * only be used if a custom delete function is provided by {@link 
#deleteWith(Consumer)} or if the
+   * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations 
support bulk deletes}.

Review Comment:
   nit: Shall we import `SupportsBulkOperations` directly like in the two 
classes earlier?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+    ImmutableListMultimap<String, FileInfo> filesByType =
+        Multimaps.index(fileGroup, FileInfo::getType);
+    ListMultimap<String, String> pathsByType =
+        Multimaps.transformValues(filesByType, FileInfo::getPath);
+    for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {
+      String type = entry.getKey();
+      Collection<String> paths = entry.getValue();
+      int failures = 0;
+      try {
+        io.deleteFiles(paths);
+      } catch (BulkDeletionFailureException bulkDeletionFailureException) {

Review Comment:
   nit: Is there value in such a long exception name compared to `e`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata 
metadata) {
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;

Review Comment:
   nit: What about an empty line before if/else blocks?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata 
metadata) {
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) {

Review Comment:
   Can we invert the condition as it is hard to interpret negation?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -98,6 +101,13 @@ public InternalRow[] call(InternalRow args) {
     String location = args.isNullAt(2) ? null : args.getString(2);
     boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
     Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
+    if (maxConcurrentDeletes != null) {

Review Comment:
   Same here.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles);
+
+    } else {
+      if (deleteFunc == null) {
+        LOG.info("Bulk Deletes are not Supported by {}, using non-bulk 
deletes", table.io());
+        deleteFunc = defaultDelete;

Review Comment:
   I don't think it is a good idea to init this variable here. I summarized my 
thoughts in other actions below. One way to avoid this:
   
   ```
   if (deleteFunc == null) {
     LOG.info("Table IO does not support bulk operations, using non-bulk 
deletes");
   } else {
     LOG.info("Custom delete function provided, using non-bulk deletes");
   }
   
   Tasks.foreach(orphanFiles)
       .noRetry()
       ...
       .run(deleteFunc != null ? deleteFunc::accept : table.io()::deleteFile);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -125,7 +126,7 @@ public void accept(String file) {
   private String location = null;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;

Review Comment:
   I'd consider getting rid of `defaultDelete` and using 
`table.io()::deleteFile` explicitly.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+    ImmutableListMultimap<String, FileInfo> filesByType =

Review Comment:
   nit: Just `ListMultimap` instead of `ImmutableListMultimap`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+    ImmutableListMultimap<String, FileInfo> filesByType =
+        Multimaps.index(fileGroup, FileInfo::getType);
+    ListMultimap<String, String> pathsByType =
+        Multimaps.transformValues(filesByType, FileInfo::getPath);
+    for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {

Review Comment:
   These multi maps calls are pretty tricky and separating them out seems like 
a good idea.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+    ImmutableListMultimap<String, FileInfo> filesByType =
+        Multimaps.index(fileGroup, FileInfo::getType);
+    ListMultimap<String, String> pathsByType =
+        Multimaps.transformValues(filesByType, FileInfo::getPath);
+    for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {

Review Comment:
   nit: What about adding empty lines after method args and prior to the for 
loop?
   
   ```
     private static void deleteFileGroup(
         List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
   
       ListMultimap<String, FileInfo> filesByType =
           Multimaps.index(fileGroup, FileInfo::getType);
       ListMultimap<String, String> pathsByType =
           Multimaps.transformValues(filesByType, FileInfo::getPath);
   
       for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {
         ...
       }
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());
+
+    return summary;
+  }
+
+  private static void deleteFileGroup(
+      List<FileInfo> fileGroup, SupportsBulkOperations io, DeleteSummary 
summary) {
+    ImmutableListMultimap<String, FileInfo> filesByType =
+        Multimaps.index(fileGroup, FileInfo::getType);
+    ListMultimap<String, String> pathsByType =
+        Multimaps.transformValues(filesByType, FileInfo::getPath);
+    for (Map.Entry<String, Collection<String>> entry : 
pathsByType.asMap().entrySet()) {
+      String type = entry.getKey();
+      Collection<String> paths = entry.getValue();
+      int failures = 0;
+      try {
+        io.deleteFiles(paths);
+      } catch (BulkDeletionFailureException bulkDeletionFailureException) {
+        failures = bulkDeletionFailureException.numberFailedObjects();
+      }
+      summary.deletedFiles(entry.getKey(), paths.size() - failures);

Review Comment:
   Shall we use `type` variable defined above?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata 
metadata) {
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) {
+      if (deleteFunc == null) {
+        LOG.info("Table IO does not support Bulk Operations. Using non-bulk 
deletes.");
+        deleteFunc = defaultDelete;

Review Comment:
   If we follow the snippet above, we can remove `defaultDelete` completely.  



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +261,38 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(SupportsBulkOperations io, 
Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, 
DELETE_GROUP_SIZE);
+
+    Tasks.foreach(fileGroups)
+        .suppressFailureWhenFinished()
+        .run(fileGroup -> deleteFileGroup(fileGroup, io, summary));
+
+    LOG.info("Deleted {} total files with bulk deletes", 
summary.totalFilesCount());

Review Comment:
   I think this log message is redundant as each place that calls it logs the 
result afterwards. Each caller also logs whether it is going to use bulk 
deletes or not.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -54,6 +55,8 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
 
   private final String metadataFileLocation;
+
+  @Deprecated
   private final Consumer<String> defaultDelete =

Review Comment:
   The code still uses this var even though it is marked as deprecated. Can we 
simply remove it?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata 
metadata) {
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) {
+      if (deleteFunc == null) {
+        LOG.info("Table IO does not support Bulk Operations. Using non-bulk 
deletes.");

Review Comment:
   nit: Why capital letter for some words?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -93,6 +97,12 @@ public InternalRow[] call(InternalRow args) {
     Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
     Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
+    if (maxConcurrentDeletes != null) {
+      LOG.warn(

Review Comment:
   I don't think this description is accurate. I believe this property will 
have no impact now. 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -265,7 +266,20 @@ private Set<Long> findExpiredSnapshotIds(
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) 
{

Review Comment:
   Same comments as in `DeleteReachableFilesSparkAction`.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles);

Review Comment:
   Unlike non-bulk deletes, this may fail the entire action if there is a bulk 
delete exception. I think we will have to add a helper method like this or 
similar?
   
   ```
   private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
     try {
       io.deleteFiles(paths);
       LOG.info("Deleted {} files using bulk deletes", paths.size());
   
     } catch (BulkDeletionFailureException e) {
       int deletedFilesCount = paths.size() - e.numberFailedObjects();
       LOG.warn(
           "Deleted only {} of {} files using bulk deletes",
           deletedFilesCount,
           paths.size());
     }
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles);
+
+    } else {
+      if (deleteFunc == null) {
+        LOG.info("Bulk Deletes are not Supported by {}, using non-bulk 
deletes", table.io());

Review Comment:
   nit: Can we align log messages with other actions?



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)
+        .suppressFailureWhenFinished()
+        .onFailure(
+            (f, e) -> {
+              LOG.error("Failure during bulk delete on file: {} ", f, e);
+              failureCount.incrementAndGet();
+            })
+        .run(this::deleteFile);
+
+    if (failureCount.get() != 0) {
+      throw new BulkDeletionFailureException(failureCount.get());
+    }
+  }
+
+  private int deleteThreads() {
+    return conf()

Review Comment:
   I still think having an extra variable here would make it a bit more 
readable.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java:
##########
@@ -132,7 +135,20 @@ private Dataset<FileInfo> reachableFileDS(TableMetadata 
metadata) {
   }
 
   private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (deleteFunc != null || !(io instanceof SupportsBulkOperations)) {
+      if (deleteFunc == null) {
+        LOG.info("Table IO does not support Bulk Operations. Using non-bulk 
deletes.");
+        deleteFunc = defaultDelete;

Review Comment:
   Hm, it seems a bit odd to init this variable here. Also, setting this will 
change the behavior if the action is called multiple times. Can we pass delete 
func explicitly?
   
   ```
   if (deleteFunc == null && io instanceof SupportsBulkOperations) {
     LOG.info("Table IO supports bulk operations, using bulk deletes");
     summary = deleteFiles((SupportsBulkOperations) io, files);
   } else {
     if (deleteFunc == null) {
       LOG.info("Table IO does not support bulk operations, using non-bulk 
deletes");
       summary = deleteFiles(deleteExecutorService, io::deleteFile, files);
     } else {
       LOG.info("Custom delete function provided, using non-bulk deletes");
       summary = deleteFiles(deleteExecutorService, deleteFunc, files);
     }
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,24 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
+      ((SupportsBulkOperations) table.io()).deleteFiles(orphanFiles);

Review Comment:
   Logging the number of removed files like above may help debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to