sririshindra commented on code in PR #12824:
URL: https://github.com/apache/iceberg/pull/12824#discussion_r2067539142


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -407,15 +409,49 @@ private Builder doExecuteWithPartialProgress(
 
   Stream<RewriteFileGroup> toGroupStream(
       RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> 
groupsByPartition) {
-    return groupsByPartition.entrySet().stream()
+    if (maxFilesToRewrite == null) {
+      return groupsByPartition.entrySet().stream()
+          .filter(e -> !e.getValue().isEmpty())
+          .flatMap(
+              e -> {
+                StructLike partition = e.getKey();
+                List<List<FileScanTask>> scanGroups = e.getValue();
+                return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, 
partition, tasks));
+              })
+          .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
+    }
+
+    LOG.info(
+        "Max files rewrite options provided for table {} with max file 
re-write value : {}",
+        table.name(),
+        maxFilesToRewrite);
+
+    List<RewriteFileGroup> selectedFileGroups = Lists.newArrayList();
+    AtomicInteger fileCountRunner = new AtomicInteger(0);
+
+    groupsByPartition.entrySet().stream()
+        .parallel()
         .filter(e -> !e.getValue().isEmpty())
-        .flatMap(
-            e -> {
-              StructLike partition = e.getKey();
-              List<List<FileScanTask>> scanGroups = e.getValue();
-              return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, 
partition, tasks));
-            })
-        .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
+        .forEach(
+            entry -> {
+              StructLike partition = entry.getKey();
+              entry
+                  .getValue()
+                  .forEach(
+                      fileScanTasks -> {
+                        if (fileCountRunner.get() < maxFilesToRewrite) {
+                          int remainingSize = maxFilesToRewrite - 
fileCountRunner.get();

Review Comment:
   @RussellSpitzer
   
   You're right, I might be overcomplicating the user profile a bit. Let's 
simplify the scenario and focus on the outcome.
   
   Consider a user who *does* set `maxFilesToRewrite` based on their *average* 
daily data volume – let's say they typically write around 500 files a day and 
set `maxFilesToRewrite` to 700 to handle daily volume and a small buffer. They 
schedule their compaction to run once daily. Most days, this works fine; any 
slight overflow gets picked up the next day.
   
   The issue arises during unpredictable data bursts, like a "Black Friday" 
event or an unexpected spike. If they suddenly get 10,000 files written on one 
day, their compaction job, still capped at rewriting 700 files by 
`maxFilesToRewrite`, will skip a massive number of newly written files (over 
9,000 in this example). These skipped files form a significant backlog.
   
   With only 700 files being compacted per day, it would take many days 
(potentially weeks) to clear that backlog. During this time, those thousands of 
files written on the burst day, and potentially many written on subsequent 
"normal" days while the backlog is being cleared, are simply sitting there, 
uncompacted. They are effectively "left behind" because the `maxFilesToRewrite` 
limit prevents the compaction job from even considering them in that run. The 
files being skipped are primarily the most recently written ones due to the 
limit, and they age over time while waiting to be included in a future limited 
compaction run. This is the scenario where older files can get "stuck" for a 
prolonged period if the backlog persists.
   
   Maybe we should be thinking about giving users visibility into this backlog 
caused by the limit. Currently, the `rewrite_data_files` procedure gives 
`failed_data_files_count` as part of its output. As I understand it, this 
metric captures files that *failed* to be rewritten when 
`partial-progress.enabled` is true, but I don't believe it captures the files 
that were simply *skipped* because the `maxFilesToRewrite` limit was reached 
before they could be considered.
   
   If the procedure's output could include a metric showing how many eligible 
files were *skipped* because the `maxFilesToRewrite` limit was hit, users could 
use this. They could monitor this skipped count. If it's consistently high or 
grows significantly after a data burst, they'd have clear data indicating they 
need to:
   
   1.  Temporarily increase `maxFilesToRewrite`.
   2.  Run compaction more frequently until the skipped count drops.
   3.  Or implement more dynamic scheduling logic based on the skipped count.
   
   So, perhaps we need to add another parameter to the output to specifically 
indicate the number of files left behind due to hitting the `maxFilesToRewrite` 
limit. Alternatively, if `failed_data_files_count` is intended to cover this, 
we might need to clarify its definition in the documentation and potentially 
adjust the implementation and add tests to ensure it correctly reflects files 
skipped due to the limit, not just those that encountered rewrite errors. This 
enhanced output would empower users to react to unexpected volumes and prevent 
a long-term buildup of uncompacted files, addressing the scenario where older 
files remain unoptimized simply because they were part of a backlog that the 
fixed daily compaction couldn't clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to