danielcweeks commented on code in PR #10233:
URL: https://github.com/apache/iceberg/pull/10233#discussion_r1581856455


##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -166,23 +178,106 @@ public void deletePrefix(String prefix) {
   @Override
   public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
     AtomicInteger failureCount = new AtomicInteger(0);
-    Tasks.foreach(pathsToDelete)
-        .executeWith(executorService())
-        .retry(DELETE_RETRY_ATTEMPTS)
-        .stopRetryOn(FileNotFoundException.class)
-        .suppressFailureWhenFinished()
-        .onFailure(
-            (f, e) -> {
-              LOG.error("Failure during bulk delete on file: {} ", f, e);
-              failureCount.incrementAndGet();
-            })
-        .run(this::deleteFile);
-
+    if (WrappedIO.isBulkDeleteAvailable()) {
+      failureCount.set(bulkDeleteFiles(pathsToDelete));
+    } else {
+      Tasks.foreach(pathsToDelete)
+          .executeWith(executorService())
+          .retry(DELETE_RETRY_ATTEMPTS)
+          .stopRetryOn(FileNotFoundException.class)
+          .suppressFailureWhenFinished()
+          .onFailure(
+              (f, e) -> {
+                LOG.error("Failure during bulk delete on file: {} ", f, e);
+                failureCount.incrementAndGet();
+              })
+          .run(this::deleteFile);
+    }
     if (failureCount.get() != 0) {
       throw new BulkDeletionFailureException(failureCount.get());
     }
   }
 
+  /**
+   * Bulk delete files.
+   * This has to support a list spanning multiple filesystems, so we group the 
paths by filesystem
+   * of schema + host.
+   * @param pathnames paths to delete.
+   * @return count of failures.
+   */
+  private int bulkDeleteFiles(Iterable<String> pathnames) {
+
+    LOG.debug("Using bulk delete operation to delete files");
+
+    SetMultimap<String, Path> fsMap =
+        Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<Future<List<Map.Entry<Path, String>>>> deletionTasks = 
Lists.newArrayList();
+    for (String path : pathnames) {
+      Path p = new Path(path);
+      final URI uri = p.toUri();
+      String fsURI = uri.getScheme() + "://" + uri.getHost() + "/";

Review Comment:
   Using the `URI` class has some significant incompatibilities for various 
cloud providers like GCP who allow non-standard characters (e.g. underscore in 
bucket name) which will result in the host being empty/null and causing 
problems.  We use classes like `GCSLocation` or `S3URI` to workaround these 
issues.  We should try to avoid use of `URI`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to