aokolnychyi commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1105204305
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -71,12 +71,15 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
DeleteOrphanFiles deleteWith(Consumer<String> deleteFunc);
/**
- * Passes an alternative executor service that will be used for removing
orphaned files.
- *
- * <p>If this method is not called, orphaned manifests and data files will
still be deleted in the
- * current thread.
- *
- * <p>
+ * Passes an alternative executor service that will be used for removing
orphaned files. This
+ * service will only be used if a custom delete function is provided by
{@link
+ * #deleteWith(Consumer)} or if the FileIO does not {@link
+ * org.apache.iceberg.io.SupportsBulkOperations support bulk deletes}.
Otherwise, parallelism
Review Comment:
nit: Sometimes, we add qualified imports to shorten such references. Up to
you, though.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -71,12 +71,15 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
DeleteOrphanFiles deleteWith(Consumer<String> deleteFunc);
/**
- * Passes an alternative executor service that will be used for removing
orphaned files.
- *
- * <p>If this method is not called, orphaned manifests and data files will
still be deleted in the
- * current thread.
- *
- * <p>
+ * Passes an alternative executor service that will be used for removing
orphaned files. This
Review Comment:
What about preserving the empty line after a short description of the method?
##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -77,20 +77,25 @@ public interface ExpireSnapshots extends
Action<ExpireSnapshots, ExpireSnapshots
* @param deleteFunc a function that will be called to delete manifests and
data files
* @return this for method chaining
*/
+ @Deprecated
Review Comment:
Left over from the initial implementation?
##########
api/src/main/java/org/apache/iceberg/actions/DeleteReachableFiles.java:
##########
@@ -44,9 +44,11 @@
DeleteReachableFiles deleteWith(Consumer<String> deleteFunc);
/**
- * Passes an alternative executor service that will be used for files
removal.
- *
- * <p>If this method is not called, files will be deleted in the current
thread.
+ * Passes an alternative executor service that will be used for files
removal. This service will
+ * only be used if a custom delete function is provided by {@link
#deleteWith(Consumer)} or if the
+ * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations
support bulk deletes}.
+ * Otherwise, parallelism should be controlled by the IO specific {@link
+ * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable)
deleteFiles} method.
Review Comment:
What about keeping the last sentence about what happens if not called?
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
}
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(3)
+ .stopRetryOn(FileNotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (f, e) -> {
+ LOG.error("Failure during bulk delete on file: {} ", f, e);
+ failureCount.incrementAndGet();
+ })
+ .run(this::deleteFile);
+
+ if (failureCount.get() != 0) {
+ throw new BulkDeletionFailureException(failureCount.get());
+ }
+ }
+
+ private int deleteThreads() {
+ return conf()
Review Comment:
nit: What about an extra var to stay on one line?
```
int defaultValue = Runtime.getRuntime().availableProcessors() *
DEFAULT_DELETE_CORE_MULTIPLE;
return conf().getInt(DELETE_FILE_PARALLELISM, defaultValue);
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +247,23 @@ private DeleteOrphanFiles.Result doExecute() {
List<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS,
prefixMismatchMode);
- Tasks.foreach(orphanFiles)
- .noRetry()
- .executeWith(deleteExecutorService)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file,
exc))
- .run(deleteFunc::accept);
+ if (deleteFunc != null || !(table.io() instanceof SupportsBulkOperations))
{
Review Comment:
nit: What about inverting this condition instead of using negation?
```
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
// bulk deletes
} else {
// non-bulk delete
}
```
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
}
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(3)
+ .stopRetryOn(FileNotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (f, e) -> {
+ LOG.error("Failure during bulk delete on file: {} ", f, e);
+ failureCount.incrementAndGet();
+ })
+ .run(this::deleteFile);
+
+ if (failureCount.get() != 0) {
+ throw new BulkDeletionFailureException(failureCount.get());
+ }
+ }
+
+ private int deleteThreads() {
+ return conf()
+ .getInt(
+ DELETE_FILE_PARALLELISM,
+ Runtime.getRuntime().availableProcessors() *
DEFAULT_DELETE_CORE_MULTIPLE);
+ }
+
+ private ExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (HadoopFileIO.class) {
+ if (executorService == null) {
+ executorService =
+ ThreadPools.newWorkerPool("iceberg-hadoopfileio-delete",
deleteThreads());
Review Comment:
nit: What about a constant to hold the pool name prefix? Maybe, it will then
fit on one line too.
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -18,27 +18,42 @@
*/
package org.apache.iceberg.hadoop;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class HadoopFileIO implements FileIO, HadoopConfigurable,
SupportsPrefixOperations {
+public class HadoopFileIO
+ implements FileIO, HadoopConfigurable, SupportsPrefixOperations,
SupportsBulkOperations {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopFileIO.class);
+ private static final String DELETE_FILE_PARALLELISM =
"iceberg.hadoop.delete_file_parallelism";
Review Comment:
nit: This var name sounds more like a value constant rather than a config
name.
##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -77,20 +77,25 @@ public interface ExpireSnapshots extends
Action<ExpireSnapshots, ExpireSnapshots
* @param deleteFunc a function that will be called to delete manifests and
data files
* @return this for method chaining
*/
+ @Deprecated
ExpireSnapshots deleteWith(Consumer<String> deleteFunc);
/**
- * Passes an alternative executor service that will be used for manifests,
data and delete files
- * deletion.
+ * Passes an alternative executor service that will be used for files
removal. This service will
+ * only be used if a custom delete function is provided by {@link
#deleteWith(Consumer)} or if the
+ * FileIO does not {@link org.apache.iceberg.io.SupportsBulkOperations
support bulk deletes}.
+ * Otherwise, parallelism should be controlled by the IO specific {@link
+ * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable)
deleteFiles} method.
*
- * <p>If this method is not called, unnecessary manifests and content files
will still be deleted
- * in the current thread.
+ * <p>If this method is not called and bulk deletes are not supported,
unnecessary manifests and
+ * content files will still be deleted in the current thread.
*
* <p>Identical to {@link
org.apache.iceberg.ExpireSnapshots#executeDeleteWith(ExecutorService)}
*
* @param executorService the service to use
* @return this for method chaining
*/
+ @Deprecated
Review Comment:
Also not needed anymore?
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,45 @@ public void deletePrefix(String prefix) {
}
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(3)
+ .stopRetryOn(FileNotFoundException.class)
Review Comment:
+1 to just focus on bulk deletes in this PR.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
private static final int DELETE_NUM_RETRIES = 3;
+ private static final int DELETE_GROUP_SIZE = 100000;
Review Comment:
We would need to make sure this logic works with the streaming iterator in
Spark.
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,46 @@ public void deletePrefix(String prefix) {
}
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ AtomicInteger failureCount = new AtomicInteger(0);
+ Tasks.foreach(pathsToDelete)
+ .executeWith(executorService())
+ .retry(3)
Review Comment:
How did we come up with this number? What do we use in other places?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]