steveloughran commented on code in PR #10233:
URL: https://github.com/apache/iceberg/pull/10233#discussion_r1605102370


##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -166,23 +178,106 @@ public void deletePrefix(String prefix) {
   @Override
   public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
     AtomicInteger failureCount = new AtomicInteger(0);
-    Tasks.foreach(pathsToDelete)
-        .executeWith(executorService())
-        .retry(DELETE_RETRY_ATTEMPTS)
-        .stopRetryOn(FileNotFoundException.class)
-        .suppressFailureWhenFinished()
-        .onFailure(
-            (f, e) -> {
-              LOG.error("Failure during bulk delete on file: {} ", f, e);
-              failureCount.incrementAndGet();
-            })
-        .run(this::deleteFile);
-
+    if (WrappedIO.isBulkDeleteAvailable()) {
+      failureCount.set(bulkDeleteFiles(pathsToDelete));
+    } else {
+      Tasks.foreach(pathsToDelete)
+          .executeWith(executorService())
+          .retry(DELETE_RETRY_ATTEMPTS)
+          .stopRetryOn(FileNotFoundException.class)
+          .suppressFailureWhenFinished()
+          .onFailure(
+              (f, e) -> {
+                LOG.error("Failure during bulk delete on file: {} ", f, e);
+                failureCount.incrementAndGet();
+              })
+          .run(this::deleteFile);
+    }
     if (failureCount.get() != 0) {
       throw new BulkDeletionFailureException(failureCount.get());
     }
   }
 
+  /**
+   * Bulk delete files.
+   * This has to support a list spanning multiple filesystems, so we group the 
paths by filesystem
+   * of schema + host.
+   * @param pathnames paths to delete.
+   * @return count of failures.
+   */
+  private int bulkDeleteFiles(Iterable<String> pathnames) {
+
+    LOG.debug("Using bulk delete operation to delete files");
+
+    SetMultimap<String, Path> fsMap =
+        Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<Future<List<Map.Entry<Path, String>>>> deletionTasks = 
Lists.newArrayList();
+    for (String path : pathnames) {

Review Comment:
   that should be the case; this is based on the S3FileIO implementation except 
that it works with every FileSystem instance, rather than just s3 (we will 
offer the api with a page size of 1 everywhere).
   
   as soon as the number of entries for a filesystem matches the page size, a 
bulkdelete call is submitted. 
   
   for page size == 1 (classic fs, gcs) its a convoluted way to get the same 
delete(path, false) call.
   
   for s3
   * page size > 1: code can build up a large page before submitting; page size 
set by existing option `s3a with bulk delete disabled`. no safety checks or 
guarantees other than "objects at path without trailing / will be deleted, no 
attempts to recreate parent dir
   * page size == 1. single object at path without / will be deleted, no 
attempts to recreate parent dir.
   
   which means even when bulk delete is disabled (some third party stores, 
including GCS), it's still more efficient as it saves two LISTs and more.
   
   for the branch-3.3 implementation we are likely to just offer that 
single-but-less-chatty delete...the move to the v2 SDK means backporting 
complex stuff is now impossible. But we want to provide the reflection-friendly 
API so that it can be used easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to