deniskuzZ commented on code in PR #12629: URL: https://github.com/apache/iceberg/pull/12629#discussion_r2013978234
########## data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -149,6 +162,92 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long table, snapshot.snapshotId(), schema(partitionType), sortedStats); } + /** + * Incrementally computes the stats after the snapshot that has partition stats file till the + * given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after + * merging the stats. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. + */ + public static PartitionStatisticsFile computeAndWriteStatsFileIncremental( + Table table, long snapshotId) throws IOException { + Preconditions.checkArgument(table != null, "Table cannot be null"); + Snapshot snapshot = table.snapshot(snapshotId); + Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); + + StructType partitionType = Partitioning.partitionType(table); + Schema statsFileSchema = schema(partitionType); + PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshotId); + Collection<PartitionStats> stats; + if (statisticsFile == null) { + LOG.info("Previous stats not found. Computing the stats for whole table."); + stats = PartitionStatsUtil.computeStats(table, table.snapshot(snapshotId)); + } else { + PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); + // read previous stats, note that partition field will be read as GenericRecord + try (CloseableIterable<PartitionStats> oldStats = + readPartitionStatsFile(statsFileSchema, Files.localInput(statisticsFile.path()))) { + oldStats.forEach( + partitionStats -> + statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); + } + + // incrementally compute the new stats, here partition field will be written as PartitionData + PartitionMap<PartitionStats> incrementalStatsMap = + PartitionStatsUtil.computeStatsIncremental( + table, table.snapshot(statisticsFile.snapshotId()), table.snapshot(snapshotId)); + // convert PartitionData into GenericRecord and merge stats + incrementalStatsMap.forEach( + (key, value) -> + statsMap.merge( + Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())), + value, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + })); + stats = statsMap.values(); + } + + if (stats.isEmpty()) { + return null; + } + + List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + return writePartitionStatsFile(table, snapshotId, statsFileSchema, sortedStats); + } + + private static GenericRecord partitionDataToRecord(PartitionData data) { + GenericRecord record = GenericRecord.create(data.getPartitionType()); + for (int index = 0; index < record.size(); index++) { + record.set(index, data.get(index)); + } + + return record; + } + + private static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) { + List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles(); + if (partitionStatisticsFiles.isEmpty()) { + return null; + } + + Map<Long, PartitionStatisticsFile> stats = + partitionStatisticsFiles.stream() + .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file)); + for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) { Review Comment: sorry, my bad, I was looking at `oldestAncestorOf` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org