wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759573665


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, Integer> snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> 
snapshot.dataManifests(table().io()))
-            .filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
-
-    ManifestGroup manifestGroup =
-        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
-            .specsById(table().specs())
-            .caseSensitive(isCaseSensitive())
-            .select(scanColumns())
-            .filterData(filter())
-            .filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
-            .ignoreExisting()
-            .columnsToKeepStats(columnsToKeepStats());
-
-    if (shouldIgnoreResiduals()) {
-      manifestGroup = manifestGroup.ignoreResiduals();
-    }
-
-    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
-      manifestGroup = manifestGroup.planWith(planExecutor());
-    }
+    // map of delete file to the snapshot where the delete file is added
+    // the delete file is keyed by its path, and the snapshot is represented 
by the snapshot ordinal
+    Map<String, Integer> deleteFileToSnapshotOrdinal =
+        computeDeleteFileToSnapshotOrdinal(changelogSnapshots, 
snapshotOrdinals);
 
-    return manifestGroup.plan(new 
CreateDataFileChangeTasks(changelogSnapshots));
+    Iterable<CloseableIterable<ChangelogScanTask>> plans =
+        FluentIterable.from(changelogSnapshots)
+            .transform(
+                snapshot -> {
+                  List<ManifestFile> dataManifests = 
snapshot.dataManifests(table().io());
+                  List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(table().io());
+
+                  ManifestGroup manifestGroup =
+                      new ManifestGroup(table().io(), dataManifests, 
deleteManifests)
+                          .specsById(table().specs())
+                          .caseSensitive(isCaseSensitive())
+                          .select(scanColumns())
+                          .filterData(filter())
+                          .columnsToKeepStats(columnsToKeepStats());
+
+                  if (shouldIgnoreResiduals()) {
+                    manifestGroup = manifestGroup.ignoreResiduals();
+                  }
+
+                  if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {

Review Comment:
   I see where you're coming from.
   Before this change, there is only one call to `ManifestGroup:plan`. The 
`ManifestGroup` contains manifests from all the snapshots in the range of 
interest, only manifest entries added in one of these snapshots are considered 
and manifests with only existing entries are ignored; then one call to `plan` 
generates the scan tasks. This approach only works for the copy-on-write case 
where there are no delete files; in the copy-on-write case, when deletes occur, 
new data files are always added and so will show up in the `ManifestGroup`. 
When we have delete files, a situation as you describe can easily arise, where 
there is no new data file added in a snapshot, only new delete files. That is 
why we need a different approach.
   With this change, there is a `ManifestGroup` created for each snapshot in 
the range of interest and a call to `ManifestGroup:plan` for it; then the 
`CloseableIterable` of scan tasks for all of these are concatenated into one 
`CloseableIterable`. We no longer ignore data manifests with existing entries; 
in fact, we have to consider them. In a snapshot with no added data files, just 
added delete files, we consider the existing data files and see if any deletes 
apply to them.
   Now, each call to `ManifestGroup:plan` is still only parallelizable 
depending on the number of data manifest files in that `ManifestGroup`. There 
is additional parallelism possible, but I'm not aware that we have the API for 
it, which is since we are making multiple `ManifestGroup:plan` calls (but on 
different `ManifestGroup` instances), to distribute each unit of work from 
every call to one executor service. That is not so easy to do with the API we 
have, I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to