pvary commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2445439546


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +158,473 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(
+      Long fromSnapshotIdExclusive, Map<Long, DeleteFileIndex> 
addedDeletesBySnapshot) {
+    if (fromSnapshotIdExclusive == null) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    // Check if we need existingDeleteIndex for equality deletes
+    boolean needsExistingDeleteIndex = false;
+
+    for (DeleteFileIndex addedDeleteIndex : addedDeletesBySnapshot.values()) {
+      if (!addedDeleteIndex.isEmpty()) {
+        // Check if this snapshot has equality deletes
+        for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+          if (df.content() == FileContent.EQUALITY_DELETES) {
+            needsExistingDeleteIndex = true;
+            break;
+          }
+        }
+        if (needsExistingDeleteIndex) {
+          break;
+        }
+      }
+    }
+
+    if (!needsExistingDeleteIndex) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+        fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    // Load delete files from manifests
+    List<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Builds per-snapshot delete file indexes for newly added delete files in 
each changelog
+   * snapshot. These deletes should generate DELETE changelog rows. Uses 
caching to avoid re-parsing
+   * manifests.
+   */
+  private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot> 
changelogSnapshots) {
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = Maps.newHashMap();
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      List<ManifestFile> snapshotDeleteManifests = 
snapshot.deleteManifests(table().io());
+      if (snapshotDeleteManifests.isEmpty()) {
+        addedDeletesBySnapshot.put(
+            snapshot.snapshotId(), 
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+        continue;
+      }
+
+      // Filter to only include delete files added in this snapshot
+      List<ManifestFile> addedDeleteManifests =
+          FluentIterable.from(snapshotDeleteManifests)
+              .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+              .toList();
+
+      if (addedDeleteManifests.isEmpty()) {
+        addedDeletesBySnapshot.put(
+            snapshot.snapshotId(), 
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+      } else {
+        // Load delete files from manifests
+        List<DeleteFile> deleteFiles = loadDeleteFiles(addedDeleteManifests);
+
+        DeleteFileIndex index =
+            DeleteFileIndex.builderFor(deleteFiles)
+                .specsById(table().specs())
+                .caseSensitive(isCaseSensitive())
+                .build();
+        addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+      }
+    }
+
+    return addedDeletesBySnapshot;
+  }
+
+  /**
+   * Plans tasks for EXISTING data files that are affected by newly added 
delete files. These files
+   * were not added or deleted in the changelog snapshot range, but have new 
delete files applied to
+   * them.
+   */
+  private CloseableIterable<ChangelogScanTask> planDeletedRowsTasks(
+      Deque<Snapshot> changelogSnapshots,
+      DeleteFileIndex existingDeleteIndex,
+      Map<Long, DeleteFileIndex> addedDeletesBySnapshot,
+      Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Integer> snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
+    List<ChangelogScanTask> tasks = Lists.newArrayList();
+
+    // Build a map of file statuses for each snapshot
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
+        buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds);
+
+    // Process snapshots in order, tracking which files have been handled
+    Set<String> alreadyProcessedPaths = Sets.newHashSet();
+
+    // Accumulate actual DeleteFile entries chronologically
+    List<DeleteFile> accumulatedDeletes = Lists.newArrayList();
+
+    // Start with deletes from before the changelog range
+    // Apply partition pruning to only accumulate relevant delete files
+    if (!existingDeleteIndex.isEmpty()) {
+      for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) {
+        if (partitionMatchesFilter(df)) {
+          accumulatedDeletes.add(df);
+        }
+      }
+    }
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      DeleteFileIndex addedDeleteIndex = 
addedDeletesBySnapshot.get(snapshot.snapshotId());
+      if (addedDeleteIndex.isEmpty()) {
+        continue;
+      }
+
+      // Build cumulative delete index for this snapshot from accumulated 
deletes
+      DeleteFileIndex cumulativeDeleteIndex = 
buildCumulativeDeleteIndex(accumulatedDeletes);
+
+      // Process data files for this snapshot
+      processSnapshotForDeletedRowsTasks(
+          snapshot,
+          addedDeleteIndex,
+          cumulativeDeleteIndex,
+          fileStatusBySnapshot.get(snapshot.snapshotId()),
+          alreadyProcessedPaths,
+          snapshotOrdinals,
+          tasks);
+
+      // Accumulate this snapshot's added deletes for subsequent snapshots
+      // Apply partition pruning to only accumulate relevant delete files
+      for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+        if (partitionMatchesFilter(df)) {
+          accumulatedDeletes.add(df);
+        }
+      }
+    }
+
+    return CloseableIterable.withNoopClose(tasks);
+  }
+
+  /**
+   * Builds a map of file statuses for each snapshot, tracking which files 
were added or deleted in
+   * each snapshot.
+   */
+  private Map<Long, Map<String, ManifestEntry.Status>> 
buildFileStatusBySnapshot(
+      Deque<Snapshot> changelogSnapshots, Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot = 
Maps.newHashMap();
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      Map<String, ManifestEntry.Status> fileStatuses = Maps.newHashMap();
+
+      List<ManifestFile> changedDataManifests =
+          FluentIterable.from(snapshot.dataManifests(table().io()))
+              .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+              .toList();
+
+      ManifestGroup changedGroup =
+          new ManifestGroup(table().io(), changedDataManifests, 
ImmutableList.of())
+              .specsById(table().specs())
+              .caseSensitive(isCaseSensitive())
+              .select(scanColumns())
+              .filterData(filter())
+              .ignoreExisting()
+              .columnsToKeepStats(columnsToKeepStats());
+
+      try (CloseableIterable<ManifestEntry<DataFile>> entries = 
changedGroup.entries()) {
+        for (ManifestEntry<DataFile> entry : entries) {
+          if (changelogSnapshotIds.contains(entry.snapshotId())) {
+            fileStatuses.put(entry.file().location(), entry.status());
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to collect file statuses", e);
+      }
+
+      fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses);
+    }
+
+    return fileStatusBySnapshot;
+  }
+
+  /** Builds a cumulative delete index from the accumulated list of delete 
files. */
+  private DeleteFileIndex buildCumulativeDeleteIndex(List<DeleteFile> 
accumulatedDeletes) {
+    if (accumulatedDeletes.isEmpty()) {
+      return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+    }
+
+    return DeleteFileIndex.builderFor(accumulatedDeletes)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Processes data files for a snapshot to create DeletedRowsScanTask for 
existing files affected
+   * by new delete files.
+   */
+  private void processSnapshotForDeletedRowsTasks(
+      Snapshot snapshot,
+      DeleteFileIndex addedDeleteIndex,
+      DeleteFileIndex cumulativeDeleteIndex,
+      Map<String, ManifestEntry.Status> currentSnapshotFiles,
+      Set<String> alreadyProcessedPaths,
+      Map<Long, Integer> snapshotOrdinals,
+      List<ChangelogScanTask> tasks) {
+
+    // Get all data files that exist in this snapshot
+    List<ManifestFile> allDataManifests = snapshot.dataManifests(table().io());
+    ManifestGroup allDataGroup =
+        new ManifestGroup(table().io(), allDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .ignoreDeleted()
+            .columnsToKeepStats(columnsToKeepStats());
+
+    if (shouldIgnoreResiduals()) {
+      allDataGroup = allDataGroup.ignoreResiduals();
+    }
+
+    try (CloseableIterable<ManifestEntry<DataFile>> entries = 
allDataGroup.entries()) {
+      for (ManifestEntry<DataFile> entry : entries) {
+        DataFile dataFile = entry.file();
+        String filePath = dataFile.location();
+
+        // Skip if this file was ADDED or DELETED in this snapshot
+        // (those are handled by CreateDataFileChangeTasks)
+        if (currentSnapshotFiles.containsKey(filePath)) {
+          continue;
+        }
+
+        // Skip if we already created a task for this file in this snapshot
+        String key = snapshot.snapshotId() + ":" + filePath;
+        if (alreadyProcessedPaths.contains(key)) {
+          continue;
+        }
+
+        // Check if this data file is affected by newly added delete files
+        DeleteFile[] addedDeletes = addedDeleteIndex.forEntry(entry);
+        if (addedDeletes.length == 0) {
+          continue;
+        }
+
+        // This data file was EXISTING but has new delete files applied
+        // Get existing deletes from before this snapshot (cumulative)
+        DeleteFile[] existingDeletes =
+            cumulativeDeleteIndex.isEmpty()
+                ? new DeleteFile[0]
+                : cumulativeDeleteIndex.forEntry(entry);
+
+        // Create a DeletedRowsScanTask
+        int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
+        String schemaString = SchemaParser.toJson(schema());
+        String specString = 
PartitionSpecParser.toJson(table().specs().get(dataFile.specId()));

Review Comment:
   This is a bit more complicated, but maybe we don't need to calculate this 
again and again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to