wypoon commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759573665
########## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ########## @@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles( return CloseableIterable.empty(); } - Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots); + Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); - Set<ManifestFile> newDataManifests = - FluentIterable.from(changelogSnapshots) - .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) - .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) - .toSet(); - - ManifestGroup manifestGroup = - new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) - .ignoreExisting() - .columnsToKeepStats(columnsToKeepStats()); - - if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); - } - - if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); - } + // map of delete file to the snapshot where the delete file is added + // the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal + Map<String, Integer> deleteFileToSnapshotOrdinal = + computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); - return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); + Iterable<CloseableIterable<ChangelogScanTask>> plans = + FluentIterable.from(changelogSnapshots) + .transform( + snapshot -> { + List<ManifestFile> dataManifests = snapshot.dataManifests(table().io()); + List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { + manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { Review Comment: I see where you're coming from. Before this change, there is only one call to `ManifestGroup:plan`. The `ManifestGroup` contains manifests from all the snapshots in the range of interest, only manifest entries added in one of these snapshots are considered and manifests with only existing entries are ignored; then one call to `plan` generates the scan tasks. This approach only works for the copy-on-write case where there are no delete files; in the copy-on-write case, when deletes occur, new data files are always added and so will show up in the `ManifestGroup`. When we have delete files, a situation as you describe can easily arise, where there is no new data file added in a snapshot, only new delete files. That is why we need a different approach. With this change, there is a `ManifestGroup` created for each snapshot in the range of interest and a call to `ManifestGroup:plan` for it; then the `CloseableIterable` of scan tasks for all of these are concatenated into one `CloseableIterable`. We no longer ignore data manifests with existing entries; in fact, we have to consider them. In a snapshot with no added data files, just added delete files, we consider the existing data files and see if any deletes apply to them. Now, each call to `ManifestGroup:plan` is still only parallelizable depending on the number of data manifest files in that `ManifestGroup`. There is additional parallelism possible, but I'm not aware that we have the API for it, which is since we are making multiple `ManifestGroup:plan` calls (but on different `ManifestGroup` instances), to distribute each unit of work from every call to one executor service. That is not so easy to do with the API we have, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org