amogh-jahagirdar commented on code in PR #13245:
URL: https://github.com/apache/iceberg/pull/13245#discussion_r2165581088


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -377,11 +377,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, 
IsolationLevel isolation
       this.isolationLevel = isolationLevel;
     }
 
-    private List<DataFile> overwrittenFiles() {
+    private DataFileSet overwrittenFiles() {
       if (scan == null) {
-        return ImmutableList.of();
+        return DataFileSet.create();
       } else {
-        return 
scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList());
+        return scan.tasks().stream()
+            .map(FileScanTask::file)
+            .collect(Collectors.toCollection(DataFileSet::create));
+      }
+    }
+
+    private DeleteFileSet rewritableDeletes() {
+      if (scan == null) {
+        return DeleteFileSet.create();
+      } else {
+        return scan.tasks().stream()
+            .flatMap(task -> task.deletes().stream())
+            .collect(Collectors.toCollection(DeleteFileSet::create));
       }

Review Comment:
   Ah nvm the scan tasks may need to be applying delete files if it was in MoR 
mode earlier, and if we're doing a copy on write which removes an existing 
file, we still need to make sure the orphan DVs are removed.
   
   So in that case this is required



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -377,11 +377,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, 
IsolationLevel isolation
       this.isolationLevel = isolationLevel;
     }
 
-    private List<DataFile> overwrittenFiles() {
+    private DataFileSet overwrittenFiles() {
       if (scan == null) {
-        return ImmutableList.of();
+        return DataFileSet.create();
       } else {
-        return 
scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList());
+        return scan.tasks().stream()
+            .map(FileScanTask::file)
+            .collect(Collectors.toCollection(DataFileSet::create));
+      }
+    }
+
+    private DeleteFileSet rewritableDeletes() {
+      if (scan == null) {
+        return DeleteFileSet.create();
+      } else {
+        return scan.tasks().stream()
+            .flatMap(task -> task.deletes().stream())
+            .collect(Collectors.toCollection(DeleteFileSet::create));
       }

Review Comment:
   We won't have deletes in this case right (this is in the copy on write 
implementation)? Don't think this would be needed here.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -172,14 +172,16 @@ public RewriteDataFiles.Result execute() {
         partialProgressEnabled
             ? doExecuteWithPartialProgress(plan, 
commitManager(startingSnapshotId))
             : doExecute(plan, commitManager(startingSnapshotId));
+    ImmutableRewriteDataFiles.Result result = resultBuilder.build();
 
     if (removeDanglingDeletes) {
       RemoveDanglingDeletesSparkAction action =
           new RemoveDanglingDeletesSparkAction(spark(), table);
       int removedCount = Iterables.size(action.execute().removedDeleteFiles());
-      resultBuilder.removedDeleteFilesCount(removedCount);
+      return 
result.withRemovedDeleteFilesCount(result.removedDeleteFilesCount() + 
removedCount);

Review Comment:
   Making sure I follow the math so `result.removedDeleteFilesCount()` is how 
many orphan DVs are removed and the existing `removedCount` is how many 
dangling deletes are removed?



##########
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java:
##########
@@ -59,6 +62,12 @@ public Set<DataFile> rewrittenFiles() {
         .collect(Collectors.toCollection(DataFileSet::create));
   }
 
+  public Set<DeleteFile> rewritableDeletes() {
+    return fileScanTasks().stream()
+        .flatMap(task -> task.deletes().stream().filter(f -> f.format() == 
FileFormat.PUFFIN))

Review Comment:
   nit: filter(ContentFileUtil::isDV)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to