aokolnychyi commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1105204305


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -71,12 +71,15 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
   DeleteOrphanFiles deleteWith(Consumer<String> deleteFunc);
 
   /**
-   * Passes an alternative executor service that will be used for removing 
orphaned files.
-   *
-   * <p>If this method is not called, orphaned manifests and data files will 
still be deleted in the
-   * current thread.
-   *
-   * <p>
+   * Passes an alternative executor service that will be used for removing 
orphaned files. This
+   * service will only be used if a custom delete function is provided by 
{@link
+   * #deleteWith(Consumer)} or if the FileIO does not {@link
+   * org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}. 
Otherwise, parallelism

Review Comment:
   nit: Sometimes, we add qualified imports to shorten such references. Up to 
you, though.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -71,12 +71,15 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
   DeleteOrphanFiles deleteWith(Consumer<String> deleteFunc);
 
   /**
-   * Passes an alternative executor service that will be used for removing 
orphaned files.
-   *
-   * <p>If this method is not called, orphaned manifests and data files will 
still be deleted in the
-   * current thread.
-   *
-   * <p>
+   * Passes an alternative executor service that will be used for removing 
orphaned files. This

Review Comment:
   What about preserving the empty line after a short description of the method?



##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -77,20 +77,25 @@ public interface ExpireSnapshots extends 
Action<ExpireSnapshots, ExpireSnapshots
    * @param deleteFunc a function that will be called to delete manifests and 
data files
    * @return this for method chaining
    */
+  @Deprecated

Review Comment:
   Left over from the initial implementation?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java:
##########
@@ -44,9 +44,11 @@
   DeleteReachableFiles deleteWith(Consumer<String> deleteFunc);
 
   /**
-   * Passes an alternative executor service that will be used for files 
removal.
-   *
-   * <p>If this method is not called, files will be deleted in the current 
thread.
+   * Passes an alternative executor service that will be used for files 
removal. This service will
+   * only be used if a custom delete function is provided by {@link 
#deleteWith(Consumer)} or if the
+   * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations 
support bulk deletes}.
+   * Otherwise, parallelism should be controlled by the IO specific {@link
+   * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) 
deleteFiles} method.

Review Comment:
   What about keeping the last sentence about what happens if not called?



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)
+        .suppressFailureWhenFinished()
+        .onFailure(
+            (f, e) -> {
+              LOG.error("Failure during bulk delete on file: {} ", f, e);
+              failureCount.incrementAndGet();
+            })
+        .run(this::deleteFile);
+
+    if (failureCount.get() != 0) {
+      throw new BulkDeletionFailureException(failureCount.get());
+    }
+  }
+
+  private int deleteThreads() {
+    return conf()

Review Comment:
   nit: What about an extra var to stay on one line?
   
   ```
   int defaultValue = Runtime.getRuntime().availableProcessors() * 
DEFAULT_DELETE_CORE_MULTIPLE;
   return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,23 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, 
prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, 
exc))
-        .run(deleteFunc::accept);
+    if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations)) 
{

Review Comment:
   nit: What about inverting this condition instead of using negation?
   
   ```
   if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
     // bulk deletes
   } else {
     // non-bulk delete
   }
   ```



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)
+        .suppressFailureWhenFinished()
+        .onFailure(
+            (f, e) -> {
+              LOG.error("Failure during bulk delete on file: {} ", f, e);
+              failureCount.incrementAndGet();
+            })
+        .run(this::deleteFile);
+
+    if (failureCount.get() != 0) {
+      throw new BulkDeletionFailureException(failureCount.get());
+    }
+  }
+
+  private int deleteThreads() {
+    return conf()
+        .getInt(
+            DELETE_FILE_PARALLELISM,
+            Runtime.getRuntime().availableProcessors() * 
DEFAULT_DELETE_CORE_MULTIPLE);
+  }
+
+  private ExecutorService executorService() {
+    if (executorService == null) {
+      synchronized (HadoopFileIO.class) {
+        if (executorService == null) {
+          executorService =
+              ThreadPools.newWorkerPool("iceberg-hadoopfileio-delete", 
deleteThreads());

Review Comment:
   nit: What about a constant to hold the pool name prefix? Maybe, it will then 
fit on one line too.



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -18,27 +18,42 @@
  */
 package org.apache.iceberg.hadoop;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HadoopFileIO implements FileIO, HadoopConfigurable, 
SupportsPrefixOperations {
+public class HadoopFileIO
+    implements FileIO, HadoopConfigurable, SupportsPrefixOperations, 
SupportsBulkOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete_file_parallelism";

Review Comment:
   nit: This var name sounds more like a value constant rather than a config 
name.



##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -77,20 +77,25 @@ public interface ExpireSnapshots extends 
Action<ExpireSnapshots, ExpireSnapshots
    * @param deleteFunc a function that will be called to delete manifests and 
data files
    * @return this for method chaining
    */
+  @Deprecated
   ExpireSnapshots deleteWith(Consumer<String> deleteFunc);
 
   /**
-   * Passes an alternative executor service that will be used for manifests, 
data and delete files
-   * deletion.
+   * Passes an alternative executor service that will be used for files 
removal. This service will
+   * only be used if a custom delete function is provided by {@link 
#deleteWith(Consumer)} or if the
+   * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations 
support bulk deletes}.
+   * Otherwise, parallelism should be controlled by the IO specific {@link
+   * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable) 
deleteFiles} method.
    *
-   * <p>If this method is not called, unnecessary manifests and content files 
will still be deleted
-   * in the current thread.
+   * <p>If this method is not called and bulk deletes are not supported, 
unnecessary manifests and
+   * content files will still be deleted in the current thread.
    *
    * <p>Identical to {@link 
org.apache.iceberg.ExpireSnapshots#executeDeleteWith(ExecutorService)}
    *
    * @param executorService the service to use
    * @return this for method chaining
    */
+  @Deprecated

Review Comment:
   Also not needed anymore?



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,45 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)

Review Comment:
   +1 to just focus on bulk deletes in this PR.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_GROUP_SIZE = 100000;

Review Comment:
   We would need to make sure this logic works with the streaming iterator in 
Spark.



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)

Review Comment:
   How did we come up with this number? What do we use in other places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to