pvary commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726833603


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, Integer> snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-            .filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
-
-    ManifestGroup manifestGroup =
-        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-            .specsById(table().specs())
-            .caseSensitive(isCaseSensitive())
-            .select(scanColumns())
-            .filterData(filter())
-            .filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-            .ignoreExisting()
-            .columnsToKeepStats(columnsToKeepStats());
-
-    if (shouldIgnoreResiduals()) {
-      manifestGroup = manifestGroup.ignoreResiduals();
-    }
-
-    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-      manifestGroup = manifestGroup.planWith(planExecutor());
-    }
+    // map of delete file to the snapshot where the delete file is added
+    // the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+    Map<String, Integer> deleteFileToSnapshotOrdinal =
+        computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-    return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+    Iterable<CloseableIterable<ChangelogScanTask>> plans =
+        FluentIterable.from(changelogSnapshots)
+            .transform(
+                snapshot -> {
+                  List<ManifestFile> dataManifests = 
snapshot.dataManifests(table().io());
+                  List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(table().io());
+
+                  ManifestGroup manifestGroup =
+                      new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+                          .specsById(table().specs())
+                          .caseSensitive(isCaseSensitive())
+                          .select(scanColumns())
+                          .filterData(filter())
+                          .columnsToKeepStats(columnsToKeepStats());
+
+                  if (shouldIgnoreResiduals()) {
+                    manifestGroup = manifestGroup.ignoreResiduals();
+                  }
+
+                  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
+                    manifestGroup = manifestGroup.planWith(planExecutor());
+                  }
+
+                  long snapshotId = snapshot.snapshotId();
+                  return manifestGroup.plan(
+                      new CreateDataFileChangeTasks(
+                          snapshotId, snapshotOrdinals, 
deleteFileToSnapshotOrdinal));

Review Comment:
   I'm not sure that we can filter with the current snapshotId. I think we need 
to filter with the `toSnapshotId` of the scan, so we don't emit records which 
are added and deleted when the `IncrementalScan` is for multiple snapshots.
   
   I have asked the corresponding question on the dev list thread, but for 
reference:
   
   > What is the expected behaviour when the `IncrementalScan` is created for 
not a single snapshot, but for multiple snapshots?
   > - S1 added PK1-V1
   > - S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b)
   > - S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c)
   > 
   > Let's say we have IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3).
   > Or we need to return:
   > (a)
   > - PK1,V1c,INSERTED
   > 
   > Or is it ok, to return:
   > (b)
   > - PK1,V1,INSERTED
   > - PK1,V1,DELETED
   > - PK1,V1b,INSERTED
   > - PK1,V1b,DELETED
   > - PK1,V1c,INSERTED
   
   I think the (a) is the correct behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to