wypoon commented on code in PR #9888:
URL: https://github.com/apache/iceberg/pull/9888#discussion_r1720445018


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -134,50 +138,81 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
   }
 
   private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
 
     private final Map<Long, Integer> snapshotOrdinals;
+    private final Map<Long, Snapshot> addedToChangedSnapshots;
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+    CreateDataFileChangeTasks(
+        Deque<Snapshot> snapshots, Map<Long, Snapshot> 
addedToChangedSnapshots) {
       this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+      this.addedToChangedSnapshots = addedToChangedSnapshots;
     }
 
     @Override
     public CloseableIterable<ChangelogScanTask> apply(
         CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext 
context) {
 
-      return CloseableIterable.transform(
-          entries,
-          entry -> {
-            long commitSnapshotId = entry.snapshotId();
-            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-            switch (entry.status()) {
-              case ADDED:
-                return new BaseAddedRowsScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              case DELETED:
-                return new BaseDeletedDataFileScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              default:
-                throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-            }
-          });
+      return CloseableIterable.filter(
+          CloseableIterable.transform(
+              entries,
+              entry -> {
+                long snapshotId = entry.snapshotId();
+                Snapshot snapshot = addedToChangedSnapshots.get(snapshotId);
+                long commitSnapshotId = snapshot.snapshotId();
+                int changeOrdinal = 
snapshotOrdinals.get(snapshot.snapshotId());
+                DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+                DeleteFile[] deleteFiles = 
context.deletes().forDataFile(dataFile);
+                List<DeleteFile> addedDeletes = Lists.newArrayList();
+                List<DeleteFile> existingDeletes = Lists.newArrayList();
+                for (DeleteFile file : deleteFiles) {
+                  if (file.dataSequenceNumber() == snapshot.sequenceNumber()) {
+                    addedDeletes.add(file);
+                  } else {
+                    existingDeletes.add(file);
+                  }
+                }
+
+                switch (entry.status()) {
+                  case ADDED:
+                    if (snapshotId == commitSnapshotId) {
+                      return new BaseAddedRowsScanTask(
+                          changeOrdinal,
+                          commitSnapshotId,
+                          dataFile,
+                          addedDeletes.toArray(new DeleteFile[0]),
+                          context.schemaAsString(),
+                          context.specAsString(),
+                          context.residuals());
+                    } else if (deleteFiles.length > 0) {
+                      return new BaseDeletedRowsScanTask(
+                          changeOrdinal,
+                          commitSnapshotId,
+                          dataFile,
+                          addedDeletes.toArray(new DeleteFile[0]),
+                          existingDeletes.toArray(new DeleteFile[0]),
+                          context.schemaAsString(),
+                          context.specAsString(),
+                          context.residuals());
+                    } else {
+                      return null;

Review Comment:
   That is one way! ;-)
   In my implementation, I use a placeholder `DummyChangelogScanTask` that is 
filtered out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to