ajantha-bhat commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2011326051


##########
core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java:
##########
@@ -53,14 +56,43 @@ public static Collection<PartitionStats> computeStats(Table 
table, Snapshot snap
 
     StructType partitionType = Partitioning.partitionType(table);
     List<ManifestFile> manifests = snapshot.allManifests(table.io());
-    Queue<PartitionMap<PartitionStats>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
-    Tasks.foreach(manifests)
-        .stopOnFailure()
-        .throwFailureWhenFinished()
-        .executeWith(ThreadPools.getWorkerPool())
-        .run(manifest -> statsByManifest.add(collectStats(table, manifest, 
partitionType)));
+    return collectStats(table, manifests, partitionType).values();
+  }
+
+  /**
+   * Computes the partition stats incrementally after the given snapshot to 
current snapshot.
+   *
+   * @param table the table for which partition stats to be computed.
+   * @param afterSnapshot the snapshot after which partition stats is computed 
(exclusive).
+   * @param currentSnapshot the snapshot till which partition stats is 
computed (inclusive).
+   * @return the {@link PartitionMap} of {@link PartitionStats}
+   */
+  public static PartitionMap<PartitionStats> computeStatsIncremental(
+      Table table, Snapshot afterSnapshot, Snapshot currentSnapshot) {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
+    Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must 
be partitioned");
+    Preconditions.checkArgument(currentSnapshot != null, "Current snapshot 
cannot be null");
+    Preconditions.checkArgument(afterSnapshot != null, "Snapshot cannot be 
null");
+    Preconditions.checkArgument(currentSnapshot != afterSnapshot, "Both the 
snapshots are same");
+    Preconditions.checkArgument(
+        SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), 
afterSnapshot.snapshotId()),
+        "Starting snapshot %s is not an ancestor of current snapshot %s",
+        afterSnapshot.snapshotId(),
+        currentSnapshot.snapshotId());
 
-    return mergeStats(statsByManifest, table.specs());
+    Set<Long> snapshotIdsRange =
+        Sets.newHashSet(
+            SnapshotUtil.ancestorIdsBetween(
+                currentSnapshot.snapshotId(), afterSnapshot.snapshotId(), 
table::snapshot));
+    StructType partitionType = Partitioning.partitionType(table);
+    List<ManifestFile> manifests =
+        currentSnapshot.allManifests(table.io()).stream()
+            .filter(
+                manifestFile ->
+                    snapshotIdsRange.contains(manifestFile.snapshotId())
+                        && !manifestFile.hasExistingFiles())

Review Comment:
   Note that not including the existing records manifest for incremental 
compute as those stats will be previously written when they are added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to