danielcweeks commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2437513072
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +155,468 @@ private static Map<Long, Integer>
computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}
+ /**
+ * Builds a delete file index for existing deletes that were present before
the start snapshot.
+ * These deletes should be applied to data files but should not generate
DELETE changelog rows.
+ * Uses manifest pruning and caching to optimize performance.
+ */
+ private DeleteFileIndex buildExistingDeleteIndex(Long
fromSnapshotIdExclusive) {
+ if (fromSnapshotIdExclusive == null) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+ if (fromSnapshot == null) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ List<ManifestFile> existingDeleteManifests =
fromSnapshot.deleteManifests(table().io());
+ if (existingDeleteManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Prune manifests based on partition filter to avoid processing
irrelevant manifests
+ List<ManifestFile> prunedManifests =
pruneManifestsByPartition(existingDeleteManifests);
+ if (prunedManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Load delete files with caching to avoid redundant manifest parsing
+ List<DeleteFile> deleteFiles = loadDeleteFilesWithCache(prunedManifests);
+
+ return DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Builds per-snapshot delete file indexes for newly added delete files in
each changelog
+ * snapshot. These deletes should generate DELETE changelog rows. Uses
caching to avoid re-parsing
+ * manifests.
+ */
+ private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot>
changelogSnapshots) {
+ Map<Long, DeleteFileIndex> addedDeletesBySnapshot = Maps.newHashMap();
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ List<ManifestFile> snapshotDeleteManifests =
snapshot.deleteManifests(table().io());
+ if (snapshotDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(
+ snapshot.snapshotId(),
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+ continue;
+ }
+
+ // Filter to only include delete files added in this snapshot
+ List<ManifestFile> addedDeleteManifests =
+ FluentIterable.from(snapshotDeleteManifests)
+ .filter(manifest ->
manifest.snapshotId().equals(snapshot.snapshotId()))
+ .toList();
+
+ if (addedDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(
+ snapshot.snapshotId(),
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+ } else {
+ // Load delete files with caching to avoid redundant manifest parsing
+ List<DeleteFile> deleteFiles =
loadDeleteFilesWithCache(addedDeleteManifests);
+
+ DeleteFileIndex index =
+ DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+ }
+ }
+
+ return addedDeletesBySnapshot;
+ }
+
+ /**
+ * Plans tasks for EXISTING data files that are affected by newly added
delete files. These files
+ * were not added or deleted in the changelog snapshot range, but have new
delete files applied to
+ * them.
+ */
+ private CloseableIterable<ChangelogScanTask> planDeletedRowsTasks(
+ Deque<Snapshot> changelogSnapshots,
+ DeleteFileIndex existingDeleteIndex,
+ Map<Long, DeleteFileIndex> addedDeletesBySnapshot,
+ Set<Long> changelogSnapshotIds) {
+
+ Map<Long, Integer> snapshotOrdinals =
computeSnapshotOrdinals(changelogSnapshots);
+ List<ChangelogScanTask> tasks = Lists.newArrayList();
+
+ // Build a map of file statuses for each snapshot
+ Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
+ buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds);
+
+ // Process snapshots in order, tracking which files have been handled
+ Set<String> alreadyProcessedPaths = Sets.newHashSet();
+
+ // Accumulate actual DeleteFile entries chronologically
+ List<DeleteFile> accumulatedDeletes = Lists.newArrayList();
+
+ // Start with deletes from before the changelog range
+ // Apply partition pruning to only accumulate relevant delete files
+ if (!existingDeleteIndex.isEmpty()) {
+ for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) {
+ if (partitionMatchesFilter(df)) {
+ accumulatedDeletes.add(df);
+ }
+ }
+ }
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ DeleteFileIndex addedDeleteIndex =
addedDeletesBySnapshot.get(snapshot.snapshotId());
+ if (addedDeleteIndex.isEmpty()) {
+ continue;
+ }
+
+ // Build cumulative delete index for this snapshot from accumulated
deletes
+ DeleteFileIndex cumulativeDeleteIndex =
buildCumulativeDeleteIndex(accumulatedDeletes);
+
+ // Process data files for this snapshot
+ processSnapshotForDeletedRowsTasks(
+ snapshot,
+ addedDeleteIndex,
+ cumulativeDeleteIndex,
+ fileStatusBySnapshot.get(snapshot.snapshotId()),
+ alreadyProcessedPaths,
+ snapshotOrdinals,
+ tasks);
+
+ // Accumulate this snapshot's added deletes for subsequent snapshots
+ // Apply partition pruning to only accumulate relevant delete files
+ for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+ if (partitionMatchesFilter(df)) {
+ accumulatedDeletes.add(df);
+ }
+ }
+ }
+
+ return CloseableIterable.withNoopClose(tasks);
+ }
+
+ /**
+ * Builds a map of file statuses for each snapshot, tracking which files
were added or deleted in
+ * each snapshot.
+ */
+ private Map<Long, Map<String, ManifestEntry.Status>>
buildFileStatusBySnapshot(
+ Deque<Snapshot> changelogSnapshots, Set<Long> changelogSnapshotIds) {
+
+ Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
Maps.newHashMap();
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ Map<String, ManifestEntry.Status> fileStatuses = Maps.newHashMap();
+
+ List<ManifestFile> changedDataManifests =
+ FluentIterable.from(snapshot.dataManifests(table().io()))
+ .filter(manifest ->
manifest.snapshotId().equals(snapshot.snapshotId()))
+ .toList();
+
+ ManifestGroup changedGroup =
+ new ManifestGroup(table().io(), changedDataManifests,
ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .ignoreExisting()
+ .columnsToKeepStats(columnsToKeepStats());
+
+ try (CloseableIterable<ManifestEntry<DataFile>> entries =
changedGroup.entries()) {
+ for (ManifestEntry<DataFile> entry : entries) {
+ if (changelogSnapshotIds.contains(entry.snapshotId())) {
+ fileStatuses.put(entry.file().location(), entry.status());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect file statuses", e);
+ }
+
+ fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses);
+ }
+
+ return fileStatusBySnapshot;
+ }
+
+ /** Builds a cumulative delete index from the accumulated list of delete
files. */
+ private DeleteFileIndex buildCumulativeDeleteIndex(List<DeleteFile>
accumulatedDeletes) {
+ if (accumulatedDeletes.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ return DeleteFileIndex.builderFor(accumulatedDeletes)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Processes data files for a snapshot to create DeletedRowsScanTask for
existing files affected
+ * by new delete files.
+ */
+ private void processSnapshotForDeletedRowsTasks(
+ Snapshot snapshot,
+ DeleteFileIndex addedDeleteIndex,
+ DeleteFileIndex cumulativeDeleteIndex,
+ Map<String, ManifestEntry.Status> currentSnapshotFiles,
+ Set<String> alreadyProcessedPaths,
+ Map<Long, Integer> snapshotOrdinals,
+ List<ChangelogScanTask> tasks) {
+
+ // Get all data files that exist in this snapshot
+ List<ManifestFile> allDataManifests = snapshot.dataManifests(table().io());
+ ManifestGroup allDataGroup =
+ new ManifestGroup(table().io(), allDataManifests, ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .ignoreDeleted()
+ .columnsToKeepStats(columnsToKeepStats());
+
+ if (shouldIgnoreResiduals()) {
+ allDataGroup = allDataGroup.ignoreResiduals();
+ }
+
+ try (CloseableIterable<ManifestEntry<DataFile>> entries =
allDataGroup.entries()) {
+ for (ManifestEntry<DataFile> entry : entries) {
+ DataFile dataFile = entry.file();
+ String filePath = dataFile.location();
+
+ // Skip if this file was ADDED or DELETED in this snapshot
+ // (those are handled by CreateDataFileChangeTasks)
+ if (currentSnapshotFiles.containsKey(filePath)) {
+ continue;
+ }
+
+ // Skip if we already created a task for this file in this snapshot
+ String key = snapshot.snapshotId() + ":" + filePath;
+ if (alreadyProcessedPaths.contains(key)) {
+ continue;
+ }
+
+ // Check if this data file is affected by newly added delete files
+ DeleteFile[] addedDeletes = addedDeleteIndex.forEntry(entry);
+ if (addedDeletes.length == 0) {
+ continue;
+ }
+
+ // This data file was EXISTING but has new delete files applied
+ // Get existing deletes from before this snapshot (cumulative)
+ DeleteFile[] existingDeletes =
+ cumulativeDeleteIndex.isEmpty()
+ ? new DeleteFile[0]
+ : cumulativeDeleteIndex.forEntry(entry);
+
+ // Create a DeletedRowsScanTask
+ int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
+ String schemaString = SchemaParser.toJson(schema());
+ String specString =
PartitionSpecParser.toJson(table().specs().get(dataFile.specId()));
+ PartitionSpec spec = table().specs().get(dataFile.specId());
+ Expression residualFilter = shouldIgnoreResiduals() ?
Expressions.alwaysTrue() : filter();
+ ResidualEvaluator residuals = ResidualEvaluator.of(spec,
residualFilter, isCaseSensitive());
+
+ tasks.add(
+ new BaseDeletedRowsScanTask(
+ changeOrdinal,
+ snapshot.snapshotId(),
+ dataFile.copy(shouldKeepStats()),
+ addedDeletes,
+ existingDeletes,
+ schemaString,
+ specString,
+ residuals));
+
+ // Mark this file+snapshot as processed
+ alreadyProcessedPaths.add(key);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to plan deleted rows tasks", e);
+ }
+ }
+
+ private boolean shouldKeepStats() {
+ Set<Integer> columns = columnsToKeepStats();
+ return columns != null && !columns.isEmpty();
+ }
+
+ /**
+ * Loads delete files from manifests using a cache to avoid redundant
manifest parsing. This
+ * significantly improves planning performance when the same manifests are
accessed across
+ * multiple scans or within a scan range.
+ *
+ * @param manifests the delete manifests to load
+ * @return list of delete files
+ */
+ private List<DeleteFile> loadDeleteFilesWithCache(List<ManifestFile>
manifests) {
+ DeleteManifestCache cache = DeleteManifestCache.instance();
+ List<DeleteFile> allDeleteFiles = Lists.newArrayList();
+
+ for (ManifestFile manifest : manifests) {
+ // Try to get from cache first
+ List<DeleteFile> cachedFiles = cache.get(manifest);
+
+ if (cachedFiles != null) {
+ // Cache hit - reuse parsed delete files
+ allDeleteFiles.addAll(cachedFiles);
+ } else {
+ // Cache miss - parse manifest and populate cache
+ List<DeleteFile> manifestDeleteFiles =
loadDeleteFilesFromManifest(manifest);
+ cache.put(manifest, manifestDeleteFiles);
+ allDeleteFiles.addAll(manifestDeleteFiles);
+ }
+ }
+
+ return allDeleteFiles;
+ }
+
+ /**
+ * Prunes delete manifests based on partition filter to avoid processing
irrelevant manifests.
+ * This significantly improves performance when only a subset of partitions
are relevant to the
+ * scan.
+ *
+ * @param manifests all delete manifests to consider
+ * @return list of manifests that might contain relevant delete files
+ */
+ private List<ManifestFile> pruneManifestsByPartition(List<ManifestFile>
manifests) {
+ Expression currentFilter = filter();
+
+ // If there's no filter, return all manifests
+ if (currentFilter == null ||
currentFilter.equals(Expressions.alwaysTrue())) {
+ return manifests;
+ }
+
+ List<ManifestFile> prunedManifests = Lists.newArrayList();
+
+ for (ManifestFile manifest : manifests) {
+ PartitionSpec spec = table().specs().get(manifest.partitionSpecId());
+ if (spec == null || spec.isUnpartitioned()) {
+ // Include unpartitioned manifests
+ prunedManifests.add(manifest);
+ continue;
Review Comment:
nit: this control-flow feels awkward and should probably be and `else if`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]