coderfender commented on code in PR #12824: URL: https://github.com/apache/iceberg/pull/12824#discussion_r2067555906
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -407,15 +409,49 @@ private Builder doExecuteWithPartialProgress( Stream<RewriteFileGroup> toGroupStream( RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) { - return groupsByPartition.entrySet().stream() + if (maxFilesToRewrite == null) { + return groupsByPartition.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List<List<FileScanTask>> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); + } + + LOG.info( + "Max files rewrite options provided for table {} with max file re-write value : {}", + table.name(), + maxFilesToRewrite); + + List<RewriteFileGroup> selectedFileGroups = Lists.newArrayList(); + AtomicInteger fileCountRunner = new AtomicInteger(0); + + groupsByPartition.entrySet().stream() + .parallel() .filter(e -> !e.getValue().isEmpty()) - .flatMap( - e -> { - StructLike partition = e.getKey(); - List<List<FileScanTask>> scanGroups = e.getValue(); - return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); - }) - .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); + .forEach( + entry -> { + StructLike partition = entry.getKey(); + entry + .getValue() + .forEach( + fileScanTasks -> { + if (fileCountRunner.get() < maxFilesToRewrite) { + int remainingSize = maxFilesToRewrite - fileCountRunner.get(); Review Comment: 1. I am not sure if we can ever call this scenario `Stuck` . Stuck essentially means that there are files which would never ever be processed . If the user knows that the max files being rewritten is 500 and they have a spike event they sure would know that there is a backlog accrued which would be processed (albeit slowly) eventually 2. Also users could perhaps set the option to be a dynamically generated option through simple scripting to make sure compaction runs on a proportionate scale ? (say max files to rewrite = % age of data files written / day ?) 3. This option is intended / encouraged to have an upper limit to the number of files to be processed in a single go to improve reliability . In my use case where we migrated existing data warehouse (un-compacted / partially compacted) , there were > 1 billion files and issuing a rewrite would essentially break the name node . Having this option would ensure files are processed at practical pace to eventually finish compaction 4. In my experience with compacting large data clusters ( ~ 25 PB) and > billion files , simple dashboards essentially help us to overlay blocks vs files to essentially optimize / tune compaction scale which I think is more on a metric layer than at the framework layer . Returning uncompacted files or (essentially total files - `max files to rewrite` is probably not very useful IMO) . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org