ajantha-bhat commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2016738451
##########
core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java:
##########
@@ -45,22 +49,53 @@ private PartitionStatsUtil() {}
* @param table the table for which partition stats to be computed.
* @param snapshot the snapshot for which partition stats is computed.
* @return the collection of {@link PartitionStats}
+ * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link
#computeStats(Table, Snapshot,
+ * Snapshot)} instead.
*/
+ @Deprecated
public static Collection<PartitionStats> computeStats(Table table, Snapshot
snapshot) {
- Preconditions.checkArgument(table != null, "table cannot be null");
- Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must
be partitioned");
- Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
+ return computeStats(table, null, snapshot).values();
+ }
- StructType partitionType = Partitioning.partitionType(table);
- List<ManifestFile> manifests = snapshot.allManifests(table.io());
- Queue<PartitionMap<PartitionStats>> statsByManifest =
Queues.newConcurrentLinkedQueue();
- Tasks.foreach(manifests)
- .stopOnFailure()
- .throwFailureWhenFinished()
- .executeWith(ThreadPools.getWorkerPool())
- .run(manifest -> statsByManifest.add(collectStats(table, manifest,
partitionType)));
+ /**
+ * Computes the partition stats incrementally after the given snapshot to
current snapshot. If the
+ * given snapshot is null, computes the stats completely instead of
incrementally.
+ *
+ * @param table the table for which partition stats to be computed.
+ * @param fromSnapshot the snapshot after which partition stats is computed
(exclusive).
+ * @param currentSnapshot the snapshot till which partition stats is
computed (inclusive).
+ * @return the {@link PartitionMap} of {@link PartitionStats}
+ */
+ public static PartitionMap<PartitionStats> computeStats(
+ Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
+ Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must
be partitioned");
+ Preconditions.checkArgument(currentSnapshot != null, "Current snapshot
cannot be null");
- return mergeStats(statsByManifest, table.specs());
+ Predicate<ManifestFile> manifestFilePredicate = file -> true;
+ if (fromSnapshot != null) {
+ Preconditions.checkArgument(currentSnapshot != fromSnapshot, "Both the
snapshots are same");
+ Preconditions.checkArgument(
+ SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(),
fromSnapshot.snapshotId()),
+ "Starting snapshot %s is not an ancestor of current snapshot %s",
+ fromSnapshot.snapshotId(),
+ currentSnapshot.snapshotId());
+ Set<Long> snapshotIdsRange =
+ Sets.newHashSet(
+ SnapshotUtil.ancestorIdsBetween(
+ currentSnapshot.snapshotId(), fromSnapshot.snapshotId(),
table::snapshot));
+ manifestFilePredicate =
+ manifestFile ->
+ snapshotIdsRange.contains(manifestFile.snapshotId())
+ && !manifestFile.hasExistingFiles();
Review Comment:
1) Compaction will have snapshot operation as REPLACE and we can reuse the
old stats for that scenario. But need to write the new stats file with same
data to handle clean GC of snapshot files.
Compaction will be tested end to end while adding the spark procedure.
2) About the live (existing + added),
For full compute, old manifest files will be marked as deleted and entries
will be reused as existing in the manifest files + may have additional added
entry. So, for full compute need to consider both existing and added.
For incremental compute, old stats file has some entires which are now
existing. So, should consider the existing entires.
This all leads to the next question, what happens when manifest is deleted.
That case we just update the snapshot entry (last modified) and not decrement
the stats. Hence, we should skip it for incremental compute again.
All these logic present in `collectStatsForManifest` and existing testcases
(full compute and incremental) covers it as it uses `mergeAppend` which
produces manifest mix of added and existing entires.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]