wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1764249470


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +128,124 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+    public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
-    private final Map<Long, Integer> snapshotOrdinals;
+    private DummyChangelogScanTask() {}
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    @Override
+    public ChangelogOperation operation() {
+      return ChangelogOperation.DELETE;
+    }
+
+    @Override
+    public int changeOrdinal() {
+      return 0;
+    }
+
+    @Override
+    public long commitSnapshotId() {
+      return 0L;
+    }
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
+    private final long snapshotId;
+    private final long sequenceNumber;
+    private final int changeOrdinal;
+
+    CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int 
changeOrdinal) {
+      this.snapshotId = snapshotId;
+      this.sequenceNumber = sequenceNumber;
+      this.changeOrdinal = changeOrdinal;
     }
 
     @Override
     public CloseableIterable<ChangelogScanTask> apply(
         CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext 
context) {
 
-      return CloseableIterable.transform(
-          entries,
-          entry -> {
-            long commitSnapshotId = entry.snapshotId();
-            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-            switch (entry.status()) {
-              case ADDED:
-                return new BaseAddedRowsScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              case DELETED:
-                return new BaseDeletedDataFileScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              default:
-                throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-            }
-          });
+      CloseableIterable<ChangelogScanTask> tasks =
+          CloseableIterable.transform(
+              entries,
+              entry -> {
+                long entrySnapshotId = entry.snapshotId();
+                DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+                DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
+                List<DeleteFile> added = Lists.newArrayList();
+                List<DeleteFile> existing = Lists.newArrayList();
+                for (DeleteFile deleteFile : deleteFiles) {
+                  if (sequenceNumber == deleteFile.dataSequenceNumber()) {
+                    added.add(deleteFile);
+                  } else {
+                    existing.add(deleteFile);
+                  }
+                }
+                DeleteFile[] addedDeleteFiles = added.toArray(new 
DeleteFile[0]);
+                DeleteFile[] existingDeleteFiles = existing.toArray(new 
DeleteFile[0]);

Review Comment:
   @pvary this was the bug I mentioned that I needed to fix before pushing a 
change with additional tests in `TestBaseIncrementalChangelogScan`.
   In `TestBaseIncrementalChangelogScan`, in 
`testDeletingDataFileWithExistingDeletes` and 
`testDeletingRowsInDataFileWithExistingDeletes`, if the 
`IncrementalChangelogScan` was from `snap1` to `snap3`, the bug was obscured; 
but with a scan from `snap2` to `snap3`, those tests revealed the bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to