pvary commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2445329999


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +156,464 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(Long 
fromSnapshotIdExclusive) {
+    if (fromSnapshotIdExclusive == null) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+        fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    // Load delete files with caching to avoid redundant manifest parsing
+    List<DeleteFile> deleteFiles = loadDeleteFilesWithCache(prunedManifests);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Builds per-snapshot delete file indexes for newly added delete files in 
each changelog
+   * snapshot. These deletes should generate DELETE changelog rows. Uses 
caching to avoid re-parsing
+   * manifests.
+   */
+  private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot> 
changelogSnapshots) {
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = Maps.newHashMap();
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      List<ManifestFile> snapshotDeleteManifests = 
snapshot.deleteManifests(table().io());
+      if (snapshotDeleteManifests.isEmpty()) {
+        addedDeletesBySnapshot.put(
+            snapshot.snapshotId(), 
DeleteFileIndex.builderFor(ImmutableList.of()).build());

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to