aokolnychyi commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2742758269


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +199,526 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(Long 
fromSnapshotIdExclusive) {
+    if (fromSnapshotIdExclusive == null) {
+      return DeleteFileIndex.emptyIndex();
+    }
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+        fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Load delete files from manifests
+    Iterable<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Wrapper method that tracks build calls and caches the result for reuse. 
This ensures we only
+   * build the index once even if called from multiple places.
+   */
+  private DeleteFileIndex buildExistingDeleteIndexTracked(Long 
fromSnapshotIdExclusive) {
+    if (cachedExistingDeleteIndex != null) {
+      return cachedExistingDeleteIndex;
+    }
+    existingDeleteIndexBuildCallCount++;
+    cachedExistingDeleteIndex = 
buildExistingDeleteIndex(fromSnapshotIdExclusive);
+    return cachedExistingDeleteIndex;
+  }
+
+  // Visible for testing
+  int getExistingDeleteIndexBuildCallCount() {
+    return existingDeleteIndexBuildCallCount;
+  }
+
+  // Visible for testing
+  boolean wasExistingDeleteIndexBuilt() {
+    return existingDeleteIndexBuildCallCount > 0;
+  }
+
+  /**
+   * Builds per-snapshot delete file indexes for newly added delete files in 
each changelog
+   * snapshot. These deletes should generate DELETE changelog rows. Uses 
caching to avoid re-parsing
+   * manifests.
+   */
+  private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot> 
changelogSnapshots) {
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = 
Maps.newConcurrentMap();
+    Tasks.foreach(changelogSnapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutor())
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to build delete index for snapshot {}", 
snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              List<ManifestFile> snapshotDeleteManifests = 
snapshot.deleteManifests(table().io());
+              if (snapshotDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+                return;
+              }
+
+              // Filter to only include delete files added in this snapshot
+              List<ManifestFile> addedDeleteManifests =
+                  snapshotDeleteManifests.stream()
+                      .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+                      .collect(Collectors.toUnmodifiableList());
+
+              if (addedDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+              } else {
+                // Load delete files from manifests
+                Iterable<DeleteFile> deleteFiles = 
loadDeleteFiles(addedDeleteManifests);
+
+                DeleteFileIndex index =
+                    DeleteFileIndex.builderFor(deleteFiles)
+                        .specsById(table().specs())
+                        .caseSensitive(isCaseSensitive())
+                        .build();
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+              }
+            });
+    return addedDeletesBySnapshot;
+  }
+
+  /**
+   * Plans tasks for EXISTING data files that are affected by newly added 
delete files. These files
+   * were not added or deleted in the changelog snapshot range, but have new 
delete files applied to
+   * them.
+   */
+  private CloseableIterable<ChangelogScanTask> planDeletedRowsTasks(
+      Deque<Snapshot> changelogSnapshots,
+      DeleteFileIndex existingDeleteIndex,
+      Map<Long, DeleteFileIndex> addedDeletesBySnapshot,
+      Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Integer> snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
+    List<ChangelogScanTask> tasks = Lists.newArrayList();
+
+    // Build a map of file statuses for each snapshot
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
+        buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds);
+
+    // Accumulate actual DeleteFile entries chronologically
+    List<DeleteFile> accumulatedDeletes = Lists.newArrayList();
+
+    // Start with deletes from before the changelog range
+    if (!existingDeleteIndex.isEmpty()) {
+      for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) {
+        accumulatedDeletes.add(df);
+      }
+    }
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      DeleteFileIndex addedDeleteIndex = 
addedDeletesBySnapshot.get(snapshot.snapshotId());
+      if (addedDeleteIndex.isEmpty()) {
+        continue;
+      }
+
+      DeleteFileIndex cumulativeDeleteIndex = 
buildDeleteIndex(accumulatedDeletes);
+
+      // Process data files for this snapshot
+      // Use a local set per snapshot to track processed files
+      Set<String> alreadyProcessedPaths = Sets.newHashSet();
+      processSnapshotForDeletedRowsTasks(
+          snapshot,
+          addedDeleteIndex,
+          cumulativeDeleteIndex,
+          fileStatusBySnapshot.get(snapshot.snapshotId()),
+          alreadyProcessedPaths,
+          snapshotOrdinals,
+          tasks);
+
+      // Accumulate this snapshot's added deletes for subsequent snapshots
+      for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+        accumulatedDeletes.add(df);
+      }
+    }
+
+    return CloseableIterable.withNoopClose(tasks);
+  }
+
+  /**
+   * Builds a map of file statuses for each snapshot, tracking which files 
were added or deleted in
+   * each snapshot.
+   */
+  private Map<Long, Map<String, ManifestEntry.Status>> 
buildFileStatusBySnapshot(
+      Deque<Snapshot> changelogSnapshots, Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot = 
Maps.newHashMap();
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      Map<String, ManifestEntry.Status> fileStatuses = Maps.newHashMap();
+
+      List<ManifestFile> changedDataManifests =

Review Comment:
   Shouldn't we scan all data manifest for potentially impacted files when a 
new delete gets added?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to