szehon-ho commented on code in PR #7630:
URL: https://github.com/apache/iceberg/pull/7630#discussion_r1198610502


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -159,26 +161,25 @@ public RewriteDataFiles.Result execute() {
 
     validateAndInitOptions();
 
-    Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+    StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
         planFileGroups(startingSnapshotId);
     RewriteExecutionContext ctx = new 
RewriteExecutionContext(fileGroupsByPartition);
 
     if (ctx.totalGroupCount() == 0) {
       LOG.info("Nothing found to rewrite in {}", table.name());
-      return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+      return EMPTY_RESULT;
     }
 
     Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, 
fileGroupsByPartition);
 
-    RewriteDataFilesCommitManager commitManager = 
commitManager(startingSnapshotId);
     if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+      return doExecuteWithPartialProgress(ctx, groupStream, 
commitManager(startingSnapshotId));
     } else {
-      return doExecute(ctx, groupStream, commitManager);
+      return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
     }
   }
 
-  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long 
startingSnapshotId) {
+  StructLikeMap<List<List<FileScanTask>>> planFileGroups(long 
startingSnapshotId) {

Review Comment:
   Can't comment on the exact line , but on line 196-304 below , initially you 
had done some refactoring there.  I see it was reverted as @aokolnychyi said we 
can't coerce partition to change the behavior in this discussion: 
https://github.com/apache/iceberg/pull/7630#discussion_r1197239440
   
   That is correct, but we can still do some refactoring without coerce.  I 
took a brief look and came up with this to maintain the logic:
   
   First add the methods:
   ```
   private StructLikeMap<List<FileScanTask>> groupByPartition(
         StructType partitionType, Iterable<FileScanTask> tasks) {
       StructLikeMap<List<FileScanTask>> filesByPartition = 
StructLikeMap.create(partitionType);
       StructLike emptyStruct = GenericRecord.create(partitionType);
   
       for (FileScanTask task : tasks) {
         // If a task uses an incompatible partition spec the data inside could 
contain values
         // which belong to multiple partitions in the current spec. Treating 
all such files as
         // un-partitioned and grouping them together helps to minimize new 
files made.
         StructLike taskPartition =
             task.file().specId() == table.spec().specId()
                 ? task.file().partition()
                 : emptyStruct;
   
         List<FileScanTask> files = filesByPartition.get(taskPartition);
         if (files == null) {
           files = Lists.newArrayList();
         }
   
         files.add(task);
         filesByPartition.put(taskPartition, files);
       }
       return filesByPartition;
     }
   
     private StructLikeMap<List<List<FileScanTask>>> 
fileGroupsByPartition(StructLikeMap<List<FileScanTask>> filesByPartition) {
       return filesByPartition.transformValues(this::planFileGroups);
     }
   
     private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
       return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
     }
   ```
   
   and then we can make this method like:
   ```
     Map<StructLike, List<List<FileScanTask>>> planFileGroups(long 
startingSnapshotId) {
       CloseableIterable<FileScanTask> fileScanTasks =
           table
               .newScan()
               .useSnapshot(startingSnapshotId)
               .filter(filter)
               .ignoreResiduals()
               .planFiles();
   
       try {
         StructType partitionType = table.spec().partitionType();
         StructLikeMap<List<FileScanTask>> filesByPartition = 
groupByPartition(partitionType, fileScanTasks);
         return fileGroupsByPartition(filesByPartition);
       } finally {
         try {
           fileScanTasks.close();
         } catch (IOException io) {
           LOG.error("Cannot properly close file iterable while planning for 
rewrite", io);
         }
       }
     }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to