deniskuzZ commented on code in PR #12629: URL: https://github.com/apache/iceberg/pull/12629#discussion_r2016520551
########## core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java: ########## @@ -45,22 +49,53 @@ private PartitionStatsUtil() {} * @param table the table for which partition stats to be computed. * @param snapshot the snapshot for which partition stats is computed. * @return the collection of {@link PartitionStats} + * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #computeStats(Table, Snapshot, + * Snapshot)} instead. */ + @Deprecated public static Collection<PartitionStats> computeStats(Table table, Snapshot snapshot) { - Preconditions.checkArgument(table != null, "table cannot be null"); - Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned"); - Preconditions.checkArgument(snapshot != null, "snapshot cannot be null"); + return computeStats(table, null, snapshot).values(); + } - StructType partitionType = Partitioning.partitionType(table); - List<ManifestFile> manifests = snapshot.allManifests(table.io()); - Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue(); - Tasks.foreach(manifests) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) - .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType))); + /** + * Computes the partition stats incrementally after the given snapshot to current snapshot. If the + * given snapshot is null, computes the stats completely instead of incrementally. + * + * @param table the table for which partition stats to be computed. + * @param fromSnapshot the snapshot after which partition stats is computed (exclusive). + * @param currentSnapshot the snapshot till which partition stats is computed (inclusive). + * @return the {@link PartitionMap} of {@link PartitionStats} + */ + public static PartitionMap<PartitionStats> computeStats( + Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) { + Preconditions.checkArgument(table != null, "Table cannot be null"); + Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned"); + Preconditions.checkArgument(currentSnapshot != null, "Current snapshot cannot be null"); - return mergeStats(statsByManifest, table.specs()); + Predicate<ManifestFile> manifestFilePredicate = file -> true; + if (fromSnapshot != null) { + Preconditions.checkArgument(currentSnapshot != fromSnapshot, "Both the snapshots are same"); + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), fromSnapshot.snapshotId()), + "Starting snapshot %s is not an ancestor of current snapshot %s", + fromSnapshot.snapshotId(), + currentSnapshot.snapshotId()); + Set<Long> snapshotIdsRange = + Sets.newHashSet( + SnapshotUtil.ancestorIdsBetween( + currentSnapshot.snapshotId(), fromSnapshot.snapshotId(), table::snapshot)); + manifestFilePredicate = + manifestFile -> + snapshotIdsRange.contains(manifestFile.snapshotId()) + && !manifestFile.hasExistingFiles(); Review Comment: Are we dropping snapshot stats on expiry? (S1 +stats -> S2 + stats -> S3), then expire S1 and compute stats S3 ########## core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java: ########## @@ -45,22 +49,53 @@ private PartitionStatsUtil() {} * @param table the table for which partition stats to be computed. * @param snapshot the snapshot for which partition stats is computed. * @return the collection of {@link PartitionStats} + * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #computeStats(Table, Snapshot, + * Snapshot)} instead. */ + @Deprecated public static Collection<PartitionStats> computeStats(Table table, Snapshot snapshot) { - Preconditions.checkArgument(table != null, "table cannot be null"); - Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned"); - Preconditions.checkArgument(snapshot != null, "snapshot cannot be null"); + return computeStats(table, null, snapshot).values(); + } - StructType partitionType = Partitioning.partitionType(table); - List<ManifestFile> manifests = snapshot.allManifests(table.io()); - Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue(); - Tasks.foreach(manifests) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) - .run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType))); + /** + * Computes the partition stats incrementally after the given snapshot to current snapshot. If the + * given snapshot is null, computes the stats completely instead of incrementally. + * + * @param table the table for which partition stats to be computed. + * @param fromSnapshot the snapshot after which partition stats is computed (exclusive). + * @param currentSnapshot the snapshot till which partition stats is computed (inclusive). + * @return the {@link PartitionMap} of {@link PartitionStats} + */ + public static PartitionMap<PartitionStats> computeStats( + Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) { + Preconditions.checkArgument(table != null, "Table cannot be null"); + Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned"); + Preconditions.checkArgument(currentSnapshot != null, "Current snapshot cannot be null"); - return mergeStats(statsByManifest, table.specs()); + Predicate<ManifestFile> manifestFilePredicate = file -> true; + if (fromSnapshot != null) { + Preconditions.checkArgument(currentSnapshot != fromSnapshot, "Both the snapshots are same"); + Preconditions.checkArgument( + SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), fromSnapshot.snapshotId()), + "Starting snapshot %s is not an ancestor of current snapshot %s", + fromSnapshot.snapshotId(), + currentSnapshot.snapshotId()); + Set<Long> snapshotIdsRange = + Sets.newHashSet( + SnapshotUtil.ancestorIdsBetween( + currentSnapshot.snapshotId(), fromSnapshot.snapshotId(), table::snapshot)); + manifestFilePredicate = + manifestFile -> + snapshotIdsRange.contains(manifestFile.snapshotId()) + && !manifestFile.hasExistingFiles(); Review Comment: Are we dropping snapshot stats on expiry? (S1 + stats -> S2 + stats -> S3), then expire S1 and compute stats S3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org