gaborkaszab commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2027152279


##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {

Review Comment:
   nit: in `PartitionStatsUtil` you call the extra flag `incremental` while you 
call it `recompute` here. I'd go for `incremental` here too for consistency.



##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
+    StructType partitionType = Partitioning.partitionType(table);
+
+    Collection<PartitionStats> stats;
+    if (recompute) {
+      stats = PartitionStatsUtil.computeStats(table, snapshot);
+    } else {
+      stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+    }
+
     if (stats.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
     List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static Collection<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(
+          "Previous stats not found for incremental compute. Try full 
compute");
+    }
+
+    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+    // read previous stats, note that partition field will be read as 
GenericRecord
+    try (CloseableIterable<PartitionStats> oldStats =
+        readPartitionStatsFile(schema(partitionType), 
Files.localInput(statisticsFile.path()))) {

Review Comment:
   Just brainstorming: If the referenced stat file doesn't exist we throw some 
data reading specific exception here (haven't found a test to see what 
exception, but probably I wasn't searching hard enough). What I have in mind is 
that we could wrap it into an exception that is relevant for the incremental 
stats computing. Like `IncrementalComputeStatsFailedException` or such. Then we 
could include a member in this specific exception that holds the snapshot ID of 
`statisticsFile`. On the caller side then the caller could decide to fix this 
broken state of the table/stats and invoke compute stats for that particular 
snapshot.
   LMK WDYT!



##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
+    StructType partitionType = Partitioning.partitionType(table);
+
+    Collection<PartitionStats> stats;
+    if (recompute) {
+      stats = PartitionStatsUtil.computeStats(table, snapshot);
+    } else {
+      stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+    }
+
     if (stats.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
     List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static Collection<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(
+          "Previous stats not found for incremental compute. Try full 
compute");
+    }
+
+    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+    // read previous stats, note that partition field will be read as 
GenericRecord
+    try (CloseableIterable<PartitionStats> oldStats =
+        readPartitionStatsFile(schema(partitionType), 
Files.localInput(statisticsFile.path()))) {
+      oldStats.forEach(
+          partitionStats ->
+              statsMap.put(partitionStats.specId(), 
partitionStats.partition(), partitionStats));
+    }
+
+    // incrementally compute the new stats, partition field will be written as 
PartitionData
+    PartitionMap<PartitionStats> incrementalStatsMap =
+        PartitionStatsUtil.computeStatsIncremental(
+            table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+    // convert PartitionData into GenericRecord and merge stats
+    incrementalStatsMap.forEach(
+        (key, value) ->
+            statsMap.merge(
+                Pair.of(key.first(), partitionDataToRecord((PartitionData) 
key.second())),
+                value,
+                (existingEntry, newEntry) -> {
+                  existingEntry.appendStats(newEntry);
+                  return existingEntry;
+                }));
+
+    return statsMap.values();
+  }
+
+  private static GenericRecord partitionDataToRecord(PartitionData data) {
+    GenericRecord record = GenericRecord.create(data.getPartitionType());
+    for (int index = 0; index < record.size(); index++) {
+      record.set(index, data.get(index));
+    }
+
+    return record;
+  }
+
+  @VisibleForTesting
+  static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) 
{
+    List<PartitionStatisticsFile> partitionStatisticsFiles = 
table.partitionStatisticsFiles();
+    if (partitionStatisticsFiles.isEmpty()) {
+      return null;
+    }
+
+    Map<Long, PartitionStatisticsFile> stats =
+        partitionStatisticsFiles.stream()
+            .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, 
file -> file));
+    for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, 
table::snapshot)) {
+      if (stats.containsKey(snapshot.snapshotId())) {
+        return stats.get(snapshot.snapshotId());
+      }
+    }
+
+    throw new RuntimeException(

Review Comment:
   I'd introduce a new exception instead of this general one. See one of my 
comments above.



##########
core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java:
##########
@@ -40,27 +44,48 @@ public class PartitionStatsUtil {
   private PartitionStatsUtil() {}
 
   /**
-   * Computes the partition stats for the given snapshot of the table.
+   * Fully computes the partition stats for the given snapshot of the table.
    *
    * @param table the table for which partition stats to be computed.
    * @param snapshot the snapshot for which partition stats is computed.
    * @return the collection of {@link PartitionStats}
    */
   public static Collection<PartitionStats> computeStats(Table table, Snapshot 
snapshot) {
-    Preconditions.checkArgument(table != null, "table cannot be null");
-    Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must 
be partitioned");
-    Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
+    Preconditions.checkArgument(table != null, "Table cannot be null");
+    Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must 
be partitioned");
+    Preconditions.checkArgument(snapshot != null, "Current snapshot cannot be 
null");
 
-    StructType partitionType = Partitioning.partitionType(table);
-    List<ManifestFile> manifests = snapshot.allManifests(table.io());
-    Queue<PartitionMap<PartitionStats>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
-    Tasks.foreach(manifests)
-        .stopOnFailure()
-        .throwFailureWhenFinished()
-        .executeWith(ThreadPools.getWorkerPool())
-        .run(manifest -> statsByManifest.add(collectStats(table, manifest, 
partitionType)));
+    return collectStats(table, snapshot, file -> true, false /* incremental 
*/).values();
+  }
 
-    return mergeStats(statsByManifest, table.specs());
+  /**
+   * Incrementally computes the partition stats after the given snapshot to 
current snapshot.
+   *
+   * @param table the table for which partition stats to be computed.
+   * @param fromSnapshot the snapshot after which partition stats is computed 
(exclusive).
+   * @param currentSnapshot the snapshot till which partition stats is 
computed (inclusive).
+   * @return the {@link PartitionMap} of {@link PartitionStats}
+   */
+  public static PartitionMap<PartitionStats> computeStatsIncremental(
+      Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) {

Review Comment:
   nit: the name `currentSnapshot` might be misleading. Since we have a 
`fromSnapshot` can this be a `toSnapshot` or `targetSnapshot` or something 
similar?



##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +142,114 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
-    if (stats.isEmpty()) {
+    StructType partitionType = Partitioning.partitionType(table);
+
+    PartitionMap<PartitionStats> resultStatsMap;
+    if (recompute) {
+      resultStatsMap = PartitionStatsUtil.computeStatsFull(table, snapshot);
+    } else {
+      resultStatsMap = incrementalComputeAndMerge(table, snapshot, 
partitionType);
+    }
+
+    if (resultStatsMap.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
-    List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
+    List<PartitionStats> sortedStats =
+        PartitionStatsUtil.sortStats(resultStatsMap.values(), partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static PartitionMap<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(

Review Comment:
   I'm very late to this conversation, sorry about that :) I think we should 
talk a bit about how a user would use these APIs to compute stats and then we 
might be able to sort this disagreement out too.
   
   I see [the other PR](https://github.com/apache/iceberg/pull/12451) to 
introduce a `compute_partition_stats` stored proc for the "full-compute" path. 
I'd assume there would be another proc, `incremental_compute_partition_stats` 
or similar that will execute the "incremental-compute" path. If my assumption 
is correct, I think the question is why would a user decide to call one instead 
of the other. The expectation here is that the "full-compute" path is more 
expensive than the "incremental-compute" path. So if the users motivation is to 
run this cheaper operation then falling back to the full compute could be 
misleading.
   Or from a different angle: if the incremental path is expected to try first 
with the cheap computation and then fall back to the more expensive one, then 
what would be the point of having the `compute_partition_stats` procedure to 
execute "full-compute". Why would one call it? So in general I'm in favour of 
throwing an exception if the incremental computation is not feasible.
   
   Well, if the plan is to have that single Spark procedure for both 
approaches, then everything I wrote above is irrelevant :)



##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
+    StructType partitionType = Partitioning.partitionType(table);
+
+    Collection<PartitionStats> stats;
+    if (recompute) {
+      stats = PartitionStatsUtil.computeStats(table, snapshot);
+    } else {
+      stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+    }
+
     if (stats.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
     List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static Collection<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(
+          "Previous stats not found for incremental compute. Try full 
compute");
+    }
+
+    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+    // read previous stats, note that partition field will be read as 
GenericRecord
+    try (CloseableIterable<PartitionStats> oldStats =
+        readPartitionStatsFile(schema(partitionType), 
Files.localInput(statisticsFile.path()))) {
+      oldStats.forEach(
+          partitionStats ->
+              statsMap.put(partitionStats.specId(), 
partitionStats.partition(), partitionStats));
+    }
+
+    // incrementally compute the new stats, partition field will be written as 
PartitionData
+    PartitionMap<PartitionStats> incrementalStatsMap =
+        PartitionStatsUtil.computeStatsIncremental(
+            table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+    // convert PartitionData into GenericRecord and merge stats
+    incrementalStatsMap.forEach(
+        (key, value) ->
+            statsMap.merge(
+                Pair.of(key.first(), partitionDataToRecord((PartitionData) 
key.second())),
+                value,
+                (existingEntry, newEntry) -> {
+                  existingEntry.appendStats(newEntry);
+                  return existingEntry;
+                }));
+
+    return statsMap.values();
+  }
+
+  private static GenericRecord partitionDataToRecord(PartitionData data) {
+    GenericRecord record = GenericRecord.create(data.getPartitionType());
+    for (int index = 0; index < record.size(); index++) {
+      record.set(index, data.get(index));
+    }
+
+    return record;
+  }
+
+  @VisibleForTesting
+  static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) 
{
+    List<PartitionStatisticsFile> partitionStatisticsFiles = 
table.partitionStatisticsFiles();
+    if (partitionStatisticsFiles.isEmpty()) {
+      return null;

Review Comment:
   Not sure if this was discussed, but I think returning `null` here is not 
consistent with throwing an exception at L249. I'd prefer throwing an exception 
here too. On the callsite we anyway throw an exception.



##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile 
computeAndWriteStatsFile(Table table) thro
    */
   public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
       throws IOException {
+    return computeAndWrite(table, snapshotId, true /* recompute */);
+  }
+
+  /**
+   * Incrementally computes the stats after the snapshot that has partition 
stats file till the
+   * given snapshot and writes the combined result into a {@link 
PartitionStatisticsFile} after
+   * merging the stats.
+   *
+   * @param table The {@link Table} for which the partition statistics is 
computed.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
+   */
+  public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+      Table table, long snapshotId) throws IOException {
+    return computeAndWrite(table, snapshotId, false /* recompute */);
+  }
+
+  private static PartitionStatisticsFile computeAndWrite(
+      Table table, long snapshotId, boolean recompute) throws IOException {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
+    StructType partitionType = Partitioning.partitionType(table);
+
+    Collection<PartitionStats> stats;
+    if (recompute) {
+      stats = PartitionStatsUtil.computeStats(table, snapshot);
+    } else {
+      stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+    }
+
     if (stats.isEmpty()) {
       return null;
     }
 
-    StructType partitionType = Partitioning.partitionType(table);
     List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
     return writePartitionStatsFile(
         table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
+  private static Collection<PartitionStats> incrementalComputeAndMerge(
+      Table table, Snapshot snapshot, StructType partitionType) throws 
IOException {
+    PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
+    if (statisticsFile == null) {
+      throw new RuntimeException(
+          "Previous stats not found for incremental compute. Try full 
compute");
+    }
+
+    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+    // read previous stats, note that partition field will be read as 
GenericRecord
+    try (CloseableIterable<PartitionStats> oldStats =
+        readPartitionStatsFile(schema(partitionType), 
Files.localInput(statisticsFile.path()))) {
+      oldStats.forEach(
+          partitionStats ->
+              statsMap.put(partitionStats.specId(), 
partitionStats.partition(), partitionStats));
+    }
+
+    // incrementally compute the new stats, partition field will be written as 
PartitionData
+    PartitionMap<PartitionStats> incrementalStatsMap =
+        PartitionStatsUtil.computeStatsIncremental(
+            table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+    // convert PartitionData into GenericRecord and merge stats
+    incrementalStatsMap.forEach(
+        (key, value) ->
+            statsMap.merge(
+                Pair.of(key.first(), partitionDataToRecord((PartitionData) 
key.second())),
+                value,
+                (existingEntry, newEntry) -> {
+                  existingEntry.appendStats(newEntry);
+                  return existingEntry;
+                }));
+
+    return statsMap.values();
+  }
+
+  private static GenericRecord partitionDataToRecord(PartitionData data) {
+    GenericRecord record = GenericRecord.create(data.getPartitionType());
+    for (int index = 0; index < record.size(); index++) {
+      record.set(index, data.get(index));
+    }
+
+    return record;
+  }
+
+  @VisibleForTesting
+  static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) 
{
+    List<PartitionStatisticsFile> partitionStatisticsFiles = 
table.partitionStatisticsFiles();
+    if (partitionStatisticsFiles.isEmpty()) {
+      return null;
+    }
+
+    Map<Long, PartitionStatisticsFile> stats =
+        partitionStatisticsFiles.stream()
+            .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, 
file -> file));
+    for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, 
table::snapshot)) {

Review Comment:
   Thunking out loud here: What if we have a long list of snapshots and only 
the very first one has stats? Then we practically do an almost full recompute 
of stats in case we execute this for the latest one. Isn't it misleading in 
terms of expected performance? I mean if the users decised to run the 
incremental computation, they'd expect it to be cheaper than the full 
computation but in this case it isn't. I know, this is an edge case, not sure 
if we should do anything about it.
   Maybe having a threshold config for the number of ancestors we'd want to 
visit? Just asking if it makes sense at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to