amogh-jahagirdar commented on code in PR #13245: URL: https://github.com/apache/iceberg/pull/13245#discussion_r2165581088
########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -377,11 +377,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, IsolationLevel isolation this.isolationLevel = isolationLevel; } - private List<DataFile> overwrittenFiles() { + private DataFileSet overwrittenFiles() { if (scan == null) { - return ImmutableList.of(); + return DataFileSet.create(); } else { - return scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList()); + return scan.tasks().stream() + .map(FileScanTask::file) + .collect(Collectors.toCollection(DataFileSet::create)); + } + } + + private DeleteFileSet rewritableDeletes() { + if (scan == null) { + return DeleteFileSet.create(); + } else { + return scan.tasks().stream() + .flatMap(task -> task.deletes().stream()) + .collect(Collectors.toCollection(DeleteFileSet::create)); } Review Comment: Ah nvm the scan tasks may need to be applying delete files if it was in MoR mode earlier, and if we're doing a copy on write which removes an existing file, we still need to make sure the orphan DVs are removed. So in that case this is required ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -377,11 +377,23 @@ private CopyOnWriteOperation(SparkCopyOnWriteScan scan, IsolationLevel isolation this.isolationLevel = isolationLevel; } - private List<DataFile> overwrittenFiles() { + private DataFileSet overwrittenFiles() { if (scan == null) { - return ImmutableList.of(); + return DataFileSet.create(); } else { - return scan.tasks().stream().map(FileScanTask::file).collect(Collectors.toList()); + return scan.tasks().stream() + .map(FileScanTask::file) + .collect(Collectors.toCollection(DataFileSet::create)); + } + } + + private DeleteFileSet rewritableDeletes() { + if (scan == null) { + return DeleteFileSet.create(); + } else { + return scan.tasks().stream() + .flatMap(task -> task.deletes().stream()) + .collect(Collectors.toCollection(DeleteFileSet::create)); } Review Comment: We won't have deletes in this case right (this is in the copy on write implementation)? Don't think this would be needed here. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -172,14 +172,16 @@ public RewriteDataFiles.Result execute() { partialProgressEnabled ? doExecuteWithPartialProgress(plan, commitManager(startingSnapshotId)) : doExecute(plan, commitManager(startingSnapshotId)); + ImmutableRewriteDataFiles.Result result = resultBuilder.build(); if (removeDanglingDeletes) { RemoveDanglingDeletesSparkAction action = new RemoveDanglingDeletesSparkAction(spark(), table); int removedCount = Iterables.size(action.execute().removedDeleteFiles()); - resultBuilder.removedDeleteFilesCount(removedCount); + return result.withRemovedDeleteFilesCount(result.removedDeleteFilesCount() + removedCount); Review Comment: Making sure I follow the math so `result.removedDeleteFilesCount()` is how many orphan DVs are removed and the existing `removedCount` is how many dangling deletes are removed? ########## core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java: ########## @@ -59,6 +62,12 @@ public Set<DataFile> rewrittenFiles() { .collect(Collectors.toCollection(DataFileSet::create)); } + public Set<DeleteFile> rewritableDeletes() { + return fileScanTasks().stream() + .flatMap(task -> task.deletes().stream().filter(f -> f.format() == FileFormat.PUFFIN)) Review Comment: nit: filter(ContentFileUtil::isDV)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org