pvary commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2016416364
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +146,112 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
- if (stats.isEmpty()) {
+ StructType partitionType = Partitioning.partitionType(table);
+ PartitionMap<PartitionStats> resultStatsMap =
+ computeStats(table, snapshot, partitionType, recompute);
+ if (resultStatsMap.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
- List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
+ List<PartitionStats> sortedStats =
+ PartitionStatsUtil.sortStats(resultStatsMap.values(), partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static PartitionMap<PartitionStats> computeStats(
+ Table table, Snapshot snapshot, StructType partitionType, boolean
recompute)
+ throws IOException {
+ if (recompute) {
+ return PartitionStatsUtil.computeStats(table, null, snapshot);
+ }
+
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ LOG.info("Previous stats not found. Computing the stats for whole
table.");
+ return PartitionStatsUtil.computeStats(table, null, snapshot);
+ }
+
+ PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as
GenericRecord
+ try (CloseableIterable<PartitionStats> oldStats =
+ readPartitionStatsFile(schema(partitionType),
Files.localInput(statisticsFile.path()))) {
+ oldStats.forEach(
+ partitionStats ->
+ statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
+ }
+
+ // incrementally compute the new stats, partition field will be written as
PartitionData
+ PartitionMap<PartitionStats> incrementalStatsMap =
+ PartitionStatsUtil.computeStats(
+ table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+ // convert PartitionData into GenericRecord and merge stats
+ incrementalStatsMap.forEach(
+ (key, value) ->
+ statsMap.merge(
+ Pair.of(key.first(), partitionDataToRecord((PartitionData)
key.second())),
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
+
+ return statsMap;
+ }
+
+ private static GenericRecord partitionDataToRecord(PartitionData data) {
+ GenericRecord record = GenericRecord.create(data.getPartitionType());
+ for (int index = 0; index < record.size(); index++) {
+ record.set(index, data.get(index));
+ }
+
+ return record;
+ }
+
+ private static PartitionStatisticsFile latestStatsFile(Table table, long
snapshotId) {
Review Comment:
Do we have test for this code?
It is not too complicated, but non-trivial. Maybe at lease a test which
depends on the logic would be good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]