gaborkaszab commented on code in PR #12629: URL: https://github.com/apache/iceberg/pull/12629#discussion_r2027152279
########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +143,113 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { Review Comment: nit: in `PartitionStatsUtil` you call the extra flag `incremental` while you call it `recompute` here. I'd go for `incremental` here too for consistency. ########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +143,113 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); + StructType partitionType = Partitioning.partitionType(table); + + Collection<PartitionStats> stats; + if (recompute) { + stats = PartitionStatsUtil.computeStats(table, snapshot); + } else { + stats = incrementalComputeAndMerge(table, snapshot, partitionType); + } + if (stats.isEmpty()) { return null; } - StructType partitionType = Partitioning.partitionType(table); List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + private static Collection<PartitionStats> incrementalComputeAndMerge( + Table table, Snapshot snapshot, StructType partitionType) throws IOException { + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); + if (statisticsFile == null) { + throw new RuntimeException( + "Previous stats not found for incremental compute. Try full compute"); + } + + PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); + // read previous stats, note that partition field will be read as GenericRecord + try (CloseableIterable<PartitionStats> oldStats = + readPartitionStatsFile(schema(partitionType), Files.localInput(statisticsFile.path()))) { Review Comment: Just brainstorming: If the referenced stat file doesn't exist we throw some data reading specific exception here (haven't found a test to see what exception, but probably I wasn't searching hard enough). What I have in mind is that we could wrap it into an exception that is relevant for the incremental stats computing. Like `IncrementalComputeStatsFailedException` or such. Then we could include a member in this specific exception that holds the snapshot ID of `statisticsFile`. On the caller side then the caller could decide to fix this broken state of the table/stats and invoke compute stats for that particular snapshot. LMK WDYT! ########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +143,113 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); + StructType partitionType = Partitioning.partitionType(table); + + Collection<PartitionStats> stats; + if (recompute) { + stats = PartitionStatsUtil.computeStats(table, snapshot); + } else { + stats = incrementalComputeAndMerge(table, snapshot, partitionType); + } + if (stats.isEmpty()) { return null; } - StructType partitionType = Partitioning.partitionType(table); List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + private static Collection<PartitionStats> incrementalComputeAndMerge( + Table table, Snapshot snapshot, StructType partitionType) throws IOException { + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); + if (statisticsFile == null) { + throw new RuntimeException( + "Previous stats not found for incremental compute. Try full compute"); + } + + PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); + // read previous stats, note that partition field will be read as GenericRecord + try (CloseableIterable<PartitionStats> oldStats = + readPartitionStatsFile(schema(partitionType), Files.localInput(statisticsFile.path()))) { + oldStats.forEach( + partitionStats -> + statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); + } + + // incrementally compute the new stats, partition field will be written as PartitionData + PartitionMap<PartitionStats> incrementalStatsMap = + PartitionStatsUtil.computeStatsIncremental( + table, table.snapshot(statisticsFile.snapshotId()), snapshot); + + // convert PartitionData into GenericRecord and merge stats + incrementalStatsMap.forEach( + (key, value) -> + statsMap.merge( + Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())), + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + + return statsMap.values(); + } + + private static GenericRecord partitionDataToRecord(PartitionData data) { + GenericRecord record = GenericRecord.create(data.getPartitionType()); + for (int index = 0; index < record.size(); index++) { + record.set(index, data.get(index)); + } + + return record; + } + + @VisibleForTesting + static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) { + List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles(); + if (partitionStatisticsFiles.isEmpty()) { + return null; + } + + Map<Long, PartitionStatisticsFile> stats = + partitionStatisticsFiles.stream() + .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file)); + for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) { + if (stats.containsKey(snapshot.snapshotId())) { + return stats.get(snapshot.snapshotId()); + } + } + + throw new RuntimeException( Review Comment: I'd introduce a new exception instead of this general one. See one of my comments above. ########## core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java: ########## @@ -40,27 +44,48 @@ public class PartitionStatsUtil { private PartitionStatsUtil() {} /** - * Computes the partition stats for the given snapshot of the table. + * Fully computes the partition stats for the given snapshot of the table. * * @param table the table for which partition stats to be computed. * @param snapshot the snapshot for which partition stats is computed. * @return the collection of {@link PartitionStats} */ public static Collection<PartitionStats> computeStats(Table table, Snapshot snapshot) { - Preconditions.checkArgument(table != null, "table cannot be null"); - Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned"); - Preconditions.checkArgument(snapshot != null, "snapshot cannot be null"); + Preconditions.checkArgument(table != null, "Table cannot be null"); + Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned"); + Preconditions.checkArgument(snapshot != null, "Current snapshot cannot be null"); - StructType partitionType = Partitioning.partitionType(table); - List<ManifestFile> manifests = snapshot.allManifests(table.io()); - Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue(); - Tasks.foreach(manifests) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) - .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType))); + return collectStats(table, snapshot, file -> true, false /* incremental */).values(); + } - return mergeStats(statsByManifest, table.specs()); + /** + * Incrementally computes the partition stats after the given snapshot to current snapshot. + * + * @param table the table for which partition stats to be computed. + * @param fromSnapshot the snapshot after which partition stats is computed (exclusive). + * @param currentSnapshot the snapshot till which partition stats is computed (inclusive). + * @return the {@link PartitionMap} of {@link PartitionStats} + */ + public static PartitionMap<PartitionStats> computeStatsIncremental( + Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) { Review Comment: nit: the name `currentSnapshot` might be misleading. Since we have a `fromSnapshot` can this be a `toSnapshot` or `targetSnapshot` or something similar? ########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +142,114 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); - if (stats.isEmpty()) { + StructType partitionType = Partitioning.partitionType(table); + + PartitionMap<PartitionStats> resultStatsMap; + if (recompute) { + resultStatsMap = PartitionStatsUtil.computeStatsFull(table, snapshot); + } else { + resultStatsMap = incrementalComputeAndMerge(table, snapshot, partitionType); + } + + if (resultStatsMap.isEmpty()) { return null; } - StructType partitionType = Partitioning.partitionType(table); - List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + List<PartitionStats> sortedStats = + PartitionStatsUtil.sortStats(resultStatsMap.values(), partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + private static PartitionMap<PartitionStats> incrementalComputeAndMerge( + Table table, Snapshot snapshot, StructType partitionType) throws IOException { + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); + if (statisticsFile == null) { + throw new RuntimeException( Review Comment: I'm very late to this conversation, sorry about that :) I think we should talk a bit about how a user would use these APIs to compute stats and then we might be able to sort this disagreement out too. I see [the other PR](https://github.com/apache/iceberg/pull/12451) to introduce a `compute_partition_stats` stored proc for the "full-compute" path. I'd assume there would be another proc, `incremental_compute_partition_stats` or similar that will execute the "incremental-compute" path. If my assumption is correct, I think the question is why would a user decide to call one instead of the other. The expectation here is that the "full-compute" path is more expensive than the "incremental-compute" path. So if the users motivation is to run this cheaper operation then falling back to the full compute could be misleading. Or from a different angle: if the incremental path is expected to try first with the cheap computation and then fall back to the more expensive one, then what would be the point of having the `compute_partition_stats` procedure to execute "full-compute". Why would one call it? So in general I'm in favour of throwing an exception if the incremental computation is not feasible. Well, if the plan is to have that single Spark procedure for both approaches, then everything I wrote above is irrelevant :) ########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +143,113 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); + StructType partitionType = Partitioning.partitionType(table); + + Collection<PartitionStats> stats; + if (recompute) { + stats = PartitionStatsUtil.computeStats(table, snapshot); + } else { + stats = incrementalComputeAndMerge(table, snapshot, partitionType); + } + if (stats.isEmpty()) { return null; } - StructType partitionType = Partitioning.partitionType(table); List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + private static Collection<PartitionStats> incrementalComputeAndMerge( + Table table, Snapshot snapshot, StructType partitionType) throws IOException { + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); + if (statisticsFile == null) { + throw new RuntimeException( + "Previous stats not found for incremental compute. Try full compute"); + } + + PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); + // read previous stats, note that partition field will be read as GenericRecord + try (CloseableIterable<PartitionStats> oldStats = + readPartitionStatsFile(schema(partitionType), Files.localInput(statisticsFile.path()))) { + oldStats.forEach( + partitionStats -> + statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); + } + + // incrementally compute the new stats, partition field will be written as PartitionData + PartitionMap<PartitionStats> incrementalStatsMap = + PartitionStatsUtil.computeStatsIncremental( + table, table.snapshot(statisticsFile.snapshotId()), snapshot); + + // convert PartitionData into GenericRecord and merge stats + incrementalStatsMap.forEach( + (key, value) -> + statsMap.merge( + Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())), + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + + return statsMap.values(); + } + + private static GenericRecord partitionDataToRecord(PartitionData data) { + GenericRecord record = GenericRecord.create(data.getPartitionType()); + for (int index = 0; index < record.size(); index++) { + record.set(index, data.get(index)); + } + + return record; + } + + @VisibleForTesting + static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) { + List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles(); + if (partitionStatisticsFiles.isEmpty()) { + return null; Review Comment: Not sure if this was discussed, but I think returning `null` here is not consistent with throwing an exception at L249. I'd prefer throwing an exception here too. On the callsite we anyway throw an exception. ########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -135,20 +143,113 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro */ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, true /* recompute */); + } + + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + return computeAndWrite(table, snapshotId, false /* recompute */); + } + + private static PartitionStatisticsFile computeAndWrite( + Table table, long snapshotId, boolean recompute) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); + StructType partitionType = Partitioning.partitionType(table); + + Collection<PartitionStats> stats; + if (recompute) { + stats = PartitionStatsUtil.computeStats(table, snapshot); + } else { + stats = incrementalComputeAndMerge(table, snapshot, partitionType); + } + if (stats.isEmpty()) { return null; } - StructType partitionType = Partitioning.partitionType(table); List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + private static Collection<PartitionStats> incrementalComputeAndMerge( + Table table, Snapshot snapshot, StructType partitionType) throws IOException { + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); + if (statisticsFile == null) { + throw new RuntimeException( + "Previous stats not found for incremental compute. Try full compute"); + } + + PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); + // read previous stats, note that partition field will be read as GenericRecord + try (CloseableIterable<PartitionStats> oldStats = + readPartitionStatsFile(schema(partitionType), Files.localInput(statisticsFile.path()))) { + oldStats.forEach( + partitionStats -> + statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); + } + + // incrementally compute the new stats, partition field will be written as PartitionData + PartitionMap<PartitionStats> incrementalStatsMap = + PartitionStatsUtil.computeStatsIncremental( + table, table.snapshot(statisticsFile.snapshotId()), snapshot); + + // convert PartitionData into GenericRecord and merge stats + incrementalStatsMap.forEach( + (key, value) -> + statsMap.merge( + Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())), + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + + return statsMap.values(); + } + + private static GenericRecord partitionDataToRecord(PartitionData data) { + GenericRecord record = GenericRecord.create(data.getPartitionType()); + for (int index = 0; index < record.size(); index++) { + record.set(index, data.get(index)); + } + + return record; + } + + @VisibleForTesting + static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) { + List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles(); + if (partitionStatisticsFiles.isEmpty()) { + return null; + } + + Map<Long, PartitionStatisticsFile> stats = + partitionStatisticsFiles.stream() + .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file)); + for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) { Review Comment: Thunking out loud here: What if we have a long list of snapshots and only the very first one has stats? Then we practically do an almost full recompute of stats in case we execute this for the latest one. Isn't it misleading in terms of expected performance? I mean if the users decised to run the incremental computation, they'd expect it to be cheaper than the full computation but in this case it isn't. I know, this is an edge case, not sure if we should do anything about it. Maybe having a threshold config for the number of ancestors we'd want to visit? Just asking if it makes sense at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org