RussellSpitzer commented on code in PR #12824:
URL: https://github.com/apache/iceberg/pull/12824#discussion_r2067497937
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -407,15 +409,49 @@ private Builder doExecuteWithPartialProgress(
Stream<RewriteFileGroup> toGroupStream(
RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>>
groupsByPartition) {
- return groupsByPartition.entrySet().stream()
+ if (maxFilesToRewrite == null) {
+ return groupsByPartition.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .flatMap(
+ e -> {
+ StructLike partition = e.getKey();
+ List<List<FileScanTask>> scanGroups = e.getValue();
+ return scanGroups.stream().map(tasks -> newRewriteGroup(ctx,
partition, tasks));
+ })
+ .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
+ }
+
+ LOG.info(
+ "Max files rewrite options provided for table {} with max file
re-write value : {}",
+ table.name(),
+ maxFilesToRewrite);
+
+ List<RewriteFileGroup> selectedFileGroups = Lists.newArrayList();
+ AtomicInteger fileCountRunner = new AtomicInteger(0);
+
+ groupsByPartition.entrySet().stream()
+ .parallel()
.filter(e -> !e.getValue().isEmpty())
- .flatMap(
- e -> {
- StructLike partition = e.getKey();
- List<List<FileScanTask>> scanGroups = e.getValue();
- return scanGroups.stream().map(tasks -> newRewriteGroup(ctx,
partition, tasks));
- })
- .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
+ .forEach(
+ entry -> {
+ StructLike partition = entry.getKey();
+ entry
+ .getValue()
+ .forEach(
+ fileScanTasks -> {
+ if (fileCountRunner.get() < maxFilesToRewrite) {
+ int remainingSize = maxFilesToRewrite -
fileCountRunner.get();
Review Comment:
I'm still struggling a bit to figure out what class of users this is.
They know they need to set max files for shorter compactions.
They aren't running compaction frequently enough to compact their entire
dataset.
They also have a specific subset of files that should be prioritized over
the partitions which are getting compacted but they don't know it.
The real sticking point for me is still, why wouldn't this user just compact
more frequently? Would we really be solving their problem if the number of
un-compacted files is still growing monotonically over time?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]