pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726833603
########## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ########## @@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles( return CloseableIterable.empty(); } - Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots); + Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); - Set<ManifestFile> newDataManifests = - FluentIterable.from(changelogSnapshots) - .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) - .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) - .toSet(); - - ManifestGroup manifestGroup = - new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) - .ignoreExisting() - .columnsToKeepStats(columnsToKeepStats()); - - if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); - } - - if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); - } + // map of delete file to the snapshot where the delete file is added + // the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal + Map<String, Integer> deleteFileToSnapshotOrdinal = + computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); - return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); + Iterable<CloseableIterable<ChangelogScanTask>> plans = + FluentIterable.from(changelogSnapshots) + .transform( + snapshot -> { + List<ManifestFile> dataManifests = snapshot.dataManifests(table().io()); + List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io()); + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), dataManifests, deleteManifests) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { + manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { + manifestGroup = manifestGroup.planWith(planExecutor()); + } + + long snapshotId = snapshot.snapshotId(); + return manifestGroup.plan( + new CreateDataFileChangeTasks( + snapshotId, snapshotOrdinals, deleteFileToSnapshotOrdinal)); Review Comment: I'm not sure that we can filter with the current snapshotId. I think we need to filter with the `toSnapshotId` of the scan, so we don't emit records which are added and deleted when the `IncrementalScan` is for multiple snapshots. I have asked the corresponding question on the dev list thread, but for reference: > What is the expected behaviour when the `IncrementalScan` is created for not a single snapshot, but for multiple snapshots? > - S1 added PK1-V1 > - S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b) > - S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c) > > Let's say we have IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3). > Or we need to return: > (a) > - PK1,V1c,INSERTED > > Or is it ok, to return: > (b) > - PK1,V1,INSERTED > - PK1,V1,DELETED > - PK1,V1b,INSERTED > - PK1,V1b,DELETED > - PK1,V1c,INSERTED I think the (a) is the correct behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org