wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1717121087


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +131,155 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map<String, Integer> computeDeleteFileToSnapshotOrdinal(
+      Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) {
+    Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+    for (Snapshot snapshot : snapshots) {
+      Iterable<DeleteFile> deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+      for (DeleteFile deleteFile : deleteFiles) {
+        deleteFileToSnapshotOrdinal.put(
+            deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+      }
+    }
+
+    return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+    private int changeOrdinal;
+    private long commitSnapshotId;
 
+    DummyChangelogScanTask(int changeOrdinal, long commitSnapshotId) {
+      this.changeOrdinal = changeOrdinal;
+      this.commitSnapshotId = commitSnapshotId;
+    }
+
+    @Override
+    public ChangelogOperation operation() {
+      return ChangelogOperation.DELETE;
+    }
+
+    @Override
+    public int changeOrdinal() {
+      return changeOrdinal;
+    }
+
+    @Override
+    public long commitSnapshotId() {
+      return commitSnapshotId;
+    }
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
+    private final long snapshotId;
+    private final int changeOrdinal;
     private final Map<Long, Integer> snapshotOrdinals;
+    private final Map<String, Integer> deleteFileToSnapshotOrdinal;
+
+    CreateDataFileChangeTasks(
+        long snapshotId,
+        Map<Long, Integer> snapshotOrdinals,
+        Map<String, Integer> deleteFileToSnapshotOrdinal) {
+      this.snapshotId = snapshotId;
+      this.snapshotOrdinals = snapshotOrdinals;
+      this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+      this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+    }
+
+    private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+          .toArray(DeleteFile.class);
+    }
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+          .toArray(DeleteFile.class);
     }
 
     @Override
     public CloseableIterable<ChangelogScanTask> apply(
         CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext 
context) {
 
-      return CloseableIterable.transform(
-          entries,
-          entry -> {
-            long commitSnapshotId = entry.snapshotId();
-            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
-            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
-            switch (entry.status()) {
-              case ADDED:
-                return new BaseAddedRowsScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              case DELETED:
-                return new BaseDeletedDataFileScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
-              default:
-                throw new IllegalArgumentException("Unexpected entry status: " 
+ entry.status());
-            }
-          });
+      CloseableIterable<ChangelogScanTask> tasks =
+          CloseableIterable.transform(
+              entries,
+              entry -> {
+                long entrySnapshotId = entry.snapshotId();
+                DataFile dataFile = 
entry.file().copy(context.shouldKeepStats());
+                DeleteFile[] addedDeleteFiles = 
filterAdded(context.deletes().forEntry(entry));
+
+                switch (entry.status()) {
+                  case ADDED:
+                    if (entrySnapshotId == snapshotId) {
+                      return new BaseAddedRowsScanTask(
+                          changeOrdinal,
+                          snapshotId,
+                          dataFile,
+                          addedDeleteFiles,
+                          context.schemaAsString(),
+                          context.specAsString(),
+                          context.residuals());
+                    } else {
+                      // the data file is added before the snapshot we're 
processing
+                      if (addedDeleteFiles.length == 0) {
+                        return new DummyChangelogScanTask(changeOrdinal, 
snapshotId);
+                      } else {
+                        return new BaseDeletedRowsScanTask(
+                            changeOrdinal,
+                            snapshotId,
+                            dataFile,
+                            addedDeleteFiles,
+                            // not used
+                            filterExisting(context.deletes().forEntry(entry)),
+                            context.schemaAsString(),
+                            context.specAsString(),
+                            context.residuals());
+                      }
+                    }
+
+                  case DELETED:
+                    if (entrySnapshotId == snapshotId) {
+                      return new BaseDeletedDataFileScanTask(
+                          changeOrdinal,
+                          snapshotId,
+                          dataFile,
+                          filterExisting(context.deletes().forEntry(entry)),
+                          context.schemaAsString(),
+                          context.specAsString(),
+                          context.residuals());
+                    } else {
+                      return new DummyChangelogScanTask(changeOrdinal, 
snapshotId);
+                    }
+
+                  case EXISTING:
+                    if (addedDeleteFiles.length == 0) {
+                      return new DummyChangelogScanTask(changeOrdinal, 
snapshotId);
+                    } else {
+                      return new BaseDeletedRowsScanTask(

Review Comment:
   Note that a BaseDeletedRowsScanTask is only created if there are delete 
files *added in that snapshot* that may apply to the data file.
   Supposing that there is an equality delete file added in that snapshot, 
whether a full table scan happens for that snapshot depends on the equality 
deletes and how well `DeleteFileIndex.canContainEqDeletesForFile(DataFile, 
EqualityDeleteFile)` is able to filter them. If the equality deletes are for a 
partition, or even if global but can be excluded using ranges, then a full 
table scan may not happen. But yes, depending on the equality deletes it could 
happen.
   Unfortunately, I don't see an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to