sririshindra commented on code in PR #12824: URL: https://github.com/apache/iceberg/pull/12824#discussion_r2067539142
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -407,15 +409,49 @@ private Builder doExecuteWithPartialProgress( Stream<RewriteFileGroup> toGroupStream( RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) { - return groupsByPartition.entrySet().stream() + if (maxFilesToRewrite == null) { + return groupsByPartition.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List<List<FileScanTask>> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); + } + + LOG.info( + "Max files rewrite options provided for table {} with max file re-write value : {}", + table.name(), + maxFilesToRewrite); + + List<RewriteFileGroup> selectedFileGroups = Lists.newArrayList(); + AtomicInteger fileCountRunner = new AtomicInteger(0); + + groupsByPartition.entrySet().stream() + .parallel() .filter(e -> !e.getValue().isEmpty()) - .flatMap( - e -> { - StructLike partition = e.getKey(); - List<List<FileScanTask>> scanGroups = e.getValue(); - return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); - }) - .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); + .forEach( + entry -> { + StructLike partition = entry.getKey(); + entry + .getValue() + .forEach( + fileScanTasks -> { + if (fileCountRunner.get() < maxFilesToRewrite) { + int remainingSize = maxFilesToRewrite - fileCountRunner.get(); Review Comment: @RussellSpitzer You're right, I might be overcomplicating the user profile a bit. Let's simplify the scenario and focus on the outcome. Consider a user who *does* set `maxFilesToRewrite` based on their *average* daily data volume – let's say they typically write around 500 files a day and set `maxFilesToRewrite` to 700 to handle daily volume and a small buffer. They schedule their compaction to run once daily. Most days, this works fine; any slight overflow gets picked up the next day. The issue arises during unpredictable data bursts, like a "Black Friday" event or an unexpected spike. If they suddenly get 10,000 files written on one day, their compaction job, still capped at rewriting 700 files by `maxFilesToRewrite`, will skip a massive number of newly written files (over 9,000 in this example). These skipped files form a significant backlog. With only 700 files being compacted per day, it would take many days (potentially weeks) to clear that backlog. During this time, those thousands of files written on the burst day, and potentially many written on subsequent "normal" days while the backlog is being cleared, are simply sitting there, uncompacted. They are effectively "left behind" because the `maxFilesToRewrite` limit prevents the compaction job from even considering them in that run. The files being skipped are primarily the most recently written ones due to the limit, and they age over time while waiting to be included in a future limited compaction run. This is the scenario where older files can get "stuck" for a prolonged period if the backlog persists. Maybe we should be thinking about giving users visibility into this backlog caused by the limit. Currently, the `rewrite_data_files` procedure gives `failed_data_files_count` as part of its output. As I understand it, this metric captures files that *failed* to be rewritten when `partial-progress.enabled` is true, but I don't believe it captures the files that were simply *skipped* because the `maxFilesToRewrite` limit was reached before they could be considered. If the procedure's output could include a metric showing how many eligible files were *skipped* because the `maxFilesToRewrite` limit was hit, users could use this. They could monitor this skipped count. If it's consistently high or grows significantly after a data burst, they'd have clear data indicating they need to: 1. Temporarily increase `maxFilesToRewrite`. 2. Run compaction more frequently until the skipped count drops. 3. Or implement more dynamic scheduling logic based on the skipped count. So, perhaps we need to add another parameter to the output to specifically indicate the number of files left behind due to hitting the `maxFilesToRewrite` limit. Alternatively, if `failed_data_files_count` is intended to cover this, we might need to clarify its definition in the documentation and potentially adjust the implementation and add tests to ensure it correctly reflects files skipped due to the limit, not just those that encountered rewrite errors. This enhanced output would empower users to react to unexpected volumes and prevent a long-term buildup of uncompacted files, addressing the scenario where older files remain unoptimized simply because they were part of a backlog that the fixed daily compaction couldn't clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org