aokolnychyi commented on code in PR #11254:
URL: https://github.com/apache/iceberg/pull/11254#discussion_r1800023409


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -82,11 +82,9 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private final ManifestFilterManager<DeleteFile> deleteFilterManager;
 
   // update data
-  private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = 
Maps.newHashMap();
-  private final DataFileSet newDataFiles = DataFileSet.create();
-  private final DeleteFileSet newDeleteFiles = DeleteFileSet.create();
+  private final Map<PartitionSpec, DataFileSet> newDataFilesBySpec = 
Maps.newHashMap();

Review Comment:
   Hmm... When did we start using `PartitionSpec` as keys? This makes all 
operations more expensive. We always used `Integer` when indexing by specs, 
like `PartitionMap` or even `newDeleteFilesBySpec` below.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -236,43 +234,41 @@ protected boolean addsDeleteFiles() {
   /** Add a data file to the new snapshot. */
   protected void add(DataFile file) {
     Preconditions.checkNotNull(file, "Invalid data file: null");
-    if (newDataFiles.add(file)) {
-      PartitionSpec fileSpec = ops.current().spec(file.specId());
-      Preconditions.checkArgument(
-          fileSpec != null,
-          "Cannot find partition spec %s for data file: %s",
-          file.specId(),
-          file.path());
-
+    PartitionSpec fileSpec = ops.current().spec(file.specId());
+    Preconditions.checkArgument(
+        fileSpec != null,
+        "Cannot find partition spec %s for data file: %s",
+        file.specId(),
+        file.location());
+
+    DataFileSet dataFiles =
+        newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> 
DataFileSet.create());
+    if (dataFiles.add(file)) {
       addedFilesSummary.addedFile(fileSpec, file);
       hasNewDataFiles = true;
-      List<DataFile> dataFiles =
-          newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> 
Lists.newArrayList());
-      dataFiles.add(file);
     }
   }
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file) {
     Preconditions.checkNotNull(file, "Invalid delete file: null");
-    add(new DeleteFileHolder(file));
+    add(new PendingDeleteFile(file));
   }
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file, long dataSequenceNumber) {
     Preconditions.checkNotNull(file, "Invalid delete file: null");
-    add(new DeleteFileHolder(file, dataSequenceNumber));
+    add(new PendingDeleteFile(file, dataSequenceNumber));
   }
 
-  private void add(DeleteFileHolder fileHolder) {
-    int specId = fileHolder.deleteFile().specId();
-    PartitionSpec fileSpec = ops.current().spec(specId);
-    List<DeleteFileHolder> deleteFiles =
-        newDeleteFilesBySpec.computeIfAbsent(specId, s -> 
Lists.newArrayList());
+  private void add(PendingDeleteFile deleteFile) {
+    int specId = deleteFile.specId();
+    DeleteFileSet deleteFiles =
+        newDeleteFilesBySpec.computeIfAbsent(specId, s -> 
DeleteFileSet.create());

Review Comment:
   I know we called it `s` before, but let's use either `key` or `ignored` for 
consistency.



##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -772,17 +773,139 @@ protected static class DeleteFileHolder {
      *
      * @param deleteFile delete file
      */
-    DeleteFileHolder(DeleteFile deleteFile) {
+    PendingDeleteFile(DeleteFile deleteFile) {
       this.deleteFile = deleteFile;
       this.dataSequenceNumber = null;
     }
 
-    public DeleteFile deleteFile() {
-      return deleteFile;
-    }
-
+    @Override
     public Long dataSequenceNumber() {
       return dataSequenceNumber;
     }
+
+    @Override
+    public Long fileSequenceNumber() {
+      return deleteFile.fileSequenceNumber();
+    }
+
+    @Override
+    public DeleteFile copy() {

Review Comment:
   I think these `copy()` methods break this class as they expose the 
underlying data sequence number of the wrapped delete file. Can we simply throw 
`UnsupportedOperationException`?
   
   An alternative option is to re-wrap them after copying.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -236,43 +234,41 @@ protected boolean addsDeleteFiles() {
   /** Add a data file to the new snapshot. */
   protected void add(DataFile file) {
     Preconditions.checkNotNull(file, "Invalid data file: null");
-    if (newDataFiles.add(file)) {
-      PartitionSpec fileSpec = ops.current().spec(file.specId());
-      Preconditions.checkArgument(
-          fileSpec != null,
-          "Cannot find partition spec %s for data file: %s",
-          file.specId(),
-          file.path());
-
+    PartitionSpec fileSpec = ops.current().spec(file.specId());

Review Comment:
   What about a helper method to load specs? We use the same statement in some 
many places.
   
   ```
   private PartitionSpec spec(int specId) {
     return ops.current().spec(specId);
   }
   ```



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -162,10 +160,10 @@ protected Expression rowFilter() {
 
   protected List<DataFile> addedDataFiles() {
     return ImmutableList.copyOf(

Review Comment:
   Not related to this PR but I wonder whether the extra copy is justified 
here. If we are using Java 11 in the build, we can probably just switch to 
`Collectors.toUnmodifiableList()`. Should be addressed separately, for sure.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -236,43 +234,41 @@ protected boolean addsDeleteFiles() {
   /** Add a data file to the new snapshot. */
   protected void add(DataFile file) {
     Preconditions.checkNotNull(file, "Invalid data file: null");
-    if (newDataFiles.add(file)) {
-      PartitionSpec fileSpec = ops.current().spec(file.specId());
-      Preconditions.checkArgument(
-          fileSpec != null,
-          "Cannot find partition spec %s for data file: %s",
-          file.specId(),
-          file.path());
-
+    PartitionSpec fileSpec = ops.current().spec(file.specId());
+    Preconditions.checkArgument(
+        fileSpec != null,
+        "Cannot find partition spec %s for data file: %s",
+        file.specId(),
+        file.location());
+
+    DataFileSet dataFiles =
+        newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> 
DataFileSet.create());
+    if (dataFiles.add(file)) {
       addedFilesSummary.addedFile(fileSpec, file);
       hasNewDataFiles = true;
-      List<DataFile> dataFiles =
-          newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> 
Lists.newArrayList());
-      dataFiles.add(file);
     }
   }
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file) {
     Preconditions.checkNotNull(file, "Invalid delete file: null");
-    add(new DeleteFileHolder(file));
+    add(new PendingDeleteFile(file));
   }
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file, long dataSequenceNumber) {
     Preconditions.checkNotNull(file, "Invalid delete file: null");
-    add(new DeleteFileHolder(file, dataSequenceNumber));
+    add(new PendingDeleteFile(file, dataSequenceNumber));
   }
 
-  private void add(DeleteFileHolder fileHolder) {
-    int specId = fileHolder.deleteFile().specId();
-    PartitionSpec fileSpec = ops.current().spec(specId);
-    List<DeleteFileHolder> deleteFiles =
-        newDeleteFilesBySpec.computeIfAbsent(specId, s -> 
Lists.newArrayList());
+  private void add(PendingDeleteFile deleteFile) {

Review Comment:
   Can we make this `add` as consistent with `add(DataFile)` as possible?
   Same temp var names, validation, and formatting.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -974,7 +970,8 @@ private List<ManifestFile> newDataFilesAsManifests() {
       newDataFilesBySpec.forEach(
           (dataSpec, dataFiles) -> {
             List<ManifestFile> newDataManifests =
-                writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, 
dataSpec);
+                writeDataManifests(
+                    Lists.newArrayList(dataFiles), 
newDataFilesDataSequenceNumber, dataSpec);

Review Comment:
   This conversion is redundant now.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1005,7 +1002,8 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops.current().spec(specId);
-            List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
+            List<ManifestFile> newDeleteManifests =
+                writeDeleteManifests(Lists.newArrayList(deleteFiles), spec);

Review Comment:
   Here as well.



##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -595,20 +596,20 @@ private List<ManifestFile> writeDataFileGroup(
   }
 
   protected List<ManifestFile> writeDeleteManifests(
-      Collection<DeleteFileHolder> files, PartitionSpec spec) {
+      Collection<DeleteFile> files, PartitionSpec spec) {
     return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
   }
 
   private List<ManifestFile> writeDeleteFileGroup(
-      Collection<DeleteFileHolder> files, PartitionSpec spec) {
+      Collection<DeleteFile> files, PartitionSpec spec) {
     RollingManifestWriter<DeleteFile> writer = 
newRollingDeleteManifestWriter(spec);
 
     try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
-      for (DeleteFileHolder file : files) {

Review Comment:
   I agree with this logic but I wonder whether we want to assert we have 
`PendingDeleteFile` here as a sanity check. We added this wrapper to ensure we 
never rely on passed sequence number info on `DeleteFile`.
   
   What do you think, @nastra?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to