gaborkaszab commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2031063476


##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +142,114 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
-    if (stats.isEmpty()) {
+    StructType partitionType = Partitioning.partitionType(table);
+
+    PartitionMap<PartitionStats> resultStatsMap;
+    if (recompute) {
+      resultStatsMap = PartitionStatsUtil.computeStatsFull(table, snapshot);
+    } else {
+      resultStatsMap = incrementalComputeAndMerge(table, snapshot, 
partitionType);
+    }
+
+    if (resultStatsMap.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
-    List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
+    List<PartitionStats> sortedStats =
+        PartitionStatsUtil.sortStats(resultStatsMap.values(), partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static PartitionMap<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(

Review Comment:
   I think there are different approaches that could work well. I understand 
that the most convenient would be to offer a single `computeStats` function 
that can decide between incremental or full computation. On the other hand that 
could hide some details from the users. I had the impression that on Iceberg 
APIs are designed in a way to have clear boundaries and not mix functionalities 
like incremental or full stat computation.
   I believe that to come to a conclusion we might want to raise this question 
on dev@ to have wider visibility. People probably are busy with the upcoming 
Iceberg Summit, but still we could get good insights. @ajantha-bhat WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to