amogh-jahagirdar commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1089790123


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_GROUP_SIZE = 100000;

Review Comment:
   Is it feasible to make this value configurable through the options passed in 
to the action? 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -98,6 +101,12 @@ public InternalRow[] call(InternalRow args) {
     String location = args.isNullAt(2) ? null : args.getString(2);
     boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
     Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
+    if (maxConcurrentDeletes != null) {
+      LOG.warn(
+          "{} is now deprecated, parallelism should now be configured in the 
FileIO bulk operations. Check the"
+              + "configured FileIO for more information",
+          PARAMETERS[4].name());
+    }

Review Comment:
   Style nit, new line after the if block



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -265,7 +285,15 @@ private Set<Long> findExpiredSnapshotIds(
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, 
files);
+    DeleteSummary summary;
+    if (ops.io() instanceof SupportsBulkOperations) {
+      LOG.info("Triggering Bulk Delete Operations");
+      summary = deleteFiles(bulkDeleteFunc, files);
+    } else {
+      LOG.warn("Warning falling back to non-bulk deletes");

Review Comment:
   Nit on the warn log, I think we should make it more clear why it's a warning 
so users don't think anything bad is happening:
   
   "Falling back to non-bulk deletes. Bulk deletes are recommended for better 
deletion throughput"



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,45 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)

Review Comment:
   Perhaps this is out of scope for this PR since for other delete operations 
we only stop retry on FileNotFoundException, but it seems there are other cases 
where we should stop retrying like permission denied errors. 
https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L2848
 or AccessControlExceptions in Hadoop cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to