pvary commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2445329999
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +156,464 @@ private static Map<Long, Integer>
computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}
+ /**
+ * Builds a delete file index for existing deletes that were present before
the start snapshot.
+ * These deletes should be applied to data files but should not generate
DELETE changelog rows.
+ * Uses manifest pruning and caching to optimize performance.
+ */
+ private DeleteFileIndex buildExistingDeleteIndex(Long
fromSnapshotIdExclusive) {
+ if (fromSnapshotIdExclusive == null) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+ Preconditions.checkState(
+ fromSnapshot != null, "Cannot find starting snapshot: %s",
fromSnapshotIdExclusive);
+
+ List<ManifestFile> existingDeleteManifests =
fromSnapshot.deleteManifests(table().io());
+ if (existingDeleteManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Prune manifests based on partition filter to avoid processing
irrelevant manifests
+ List<ManifestFile> prunedManifests =
pruneManifestsByPartition(existingDeleteManifests);
+ if (prunedManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Load delete files with caching to avoid redundant manifest parsing
+ List<DeleteFile> deleteFiles = loadDeleteFilesWithCache(prunedManifests);
+
+ return DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Builds per-snapshot delete file indexes for newly added delete files in
each changelog
+ * snapshot. These deletes should generate DELETE changelog rows. Uses
caching to avoid re-parsing
+ * manifests.
+ */
+ private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot>
changelogSnapshots) {
+ Map<Long, DeleteFileIndex> addedDeletesBySnapshot = Maps.newHashMap();
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ List<ManifestFile> snapshotDeleteManifests =
snapshot.deleteManifests(table().io());
+ if (snapshotDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(
+ snapshot.snapshotId(),
DeleteFileIndex.builderFor(ImmutableList.of()).build());
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]