gaborkaszab commented on code in PR #12629:
URL: https://github.com/apache/iceberg/pull/12629#discussion_r2027152279
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
Review Comment:
nit: in `PartitionStatsUtil` you call the extra flag `incremental` while you
call it `recompute` here. I'd go for `incremental` here too for consistency.
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
+ StructType partitionType = Partitioning.partitionType(table);
+
+ Collection<PartitionStats> stats;
+ if (recompute) {
+ stats = PartitionStatsUtil.computeStats(table, snapshot);
+ } else {
+ stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+ }
+
if (stats.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static Collection<PartitionStats> incrementalComputeAndMerge(
+ Table table, Snapshot snapshot, StructType partitionType) throws
IOException {
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ throw new RuntimeException(
+ "Previous stats not found for incremental compute. Try full
compute");
+ }
+
+ PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as
GenericRecord
+ try (CloseableIterable<PartitionStats> oldStats =
+ readPartitionStatsFile(schema(partitionType),
Files.localInput(statisticsFile.path()))) {
Review Comment:
Just brainstorming: If the referenced stat file doesn't exist we throw some
data reading specific exception here (haven't found a test to see what
exception, but probably I wasn't searching hard enough). What I have in mind is
that we could wrap it into an exception that is relevant for the incremental
stats computing. Like `IncrementalComputeStatsFailedException` or such. Then we
could include a member in this specific exception that holds the snapshot ID of
`statisticsFile`. On the caller side then the caller could decide to fix this
broken state of the table/stats and invoke compute stats for that particular
snapshot.
LMK WDYT!
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
+ StructType partitionType = Partitioning.partitionType(table);
+
+ Collection<PartitionStats> stats;
+ if (recompute) {
+ stats = PartitionStatsUtil.computeStats(table, snapshot);
+ } else {
+ stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+ }
+
if (stats.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static Collection<PartitionStats> incrementalComputeAndMerge(
+ Table table, Snapshot snapshot, StructType partitionType) throws
IOException {
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ throw new RuntimeException(
+ "Previous stats not found for incremental compute. Try full
compute");
+ }
+
+ PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as
GenericRecord
+ try (CloseableIterable<PartitionStats> oldStats =
+ readPartitionStatsFile(schema(partitionType),
Files.localInput(statisticsFile.path()))) {
+ oldStats.forEach(
+ partitionStats ->
+ statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
+ }
+
+ // incrementally compute the new stats, partition field will be written as
PartitionData
+ PartitionMap<PartitionStats> incrementalStatsMap =
+ PartitionStatsUtil.computeStatsIncremental(
+ table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+ // convert PartitionData into GenericRecord and merge stats
+ incrementalStatsMap.forEach(
+ (key, value) ->
+ statsMap.merge(
+ Pair.of(key.first(), partitionDataToRecord((PartitionData)
key.second())),
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
+
+ return statsMap.values();
+ }
+
+ private static GenericRecord partitionDataToRecord(PartitionData data) {
+ GenericRecord record = GenericRecord.create(data.getPartitionType());
+ for (int index = 0; index < record.size(); index++) {
+ record.set(index, data.get(index));
+ }
+
+ return record;
+ }
+
+ @VisibleForTesting
+ static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId)
{
+ List<PartitionStatisticsFile> partitionStatisticsFiles =
table.partitionStatisticsFiles();
+ if (partitionStatisticsFiles.isEmpty()) {
+ return null;
+ }
+
+ Map<Long, PartitionStatisticsFile> stats =
+ partitionStatisticsFiles.stream()
+ .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId,
file -> file));
+ for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId,
table::snapshot)) {
+ if (stats.containsKey(snapshot.snapshotId())) {
+ return stats.get(snapshot.snapshotId());
+ }
+ }
+
+ throw new RuntimeException(
Review Comment:
I'd introduce a new exception instead of this general one. See one of my
comments above.
##########
core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java:
##########
@@ -40,27 +44,48 @@ public class PartitionStatsUtil {
private PartitionStatsUtil() {}
/**
- * Computes the partition stats for the given snapshot of the table.
+ * Fully computes the partition stats for the given snapshot of the table.
*
* @param table the table for which partition stats to be computed.
* @param snapshot the snapshot for which partition stats is computed.
* @return the collection of {@link PartitionStats}
*/
public static Collection<PartitionStats> computeStats(Table table, Snapshot
snapshot) {
- Preconditions.checkArgument(table != null, "table cannot be null");
- Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must
be partitioned");
- Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
+ Preconditions.checkArgument(table != null, "Table cannot be null");
+ Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must
be partitioned");
+ Preconditions.checkArgument(snapshot != null, "Current snapshot cannot be
null");
- StructType partitionType = Partitioning.partitionType(table);
- List<ManifestFile> manifests = snapshot.allManifests(table.io());
- Queue<PartitionMap<PartitionStats>> statsByManifest =
Queues.newConcurrentLinkedQueue();
- Tasks.foreach(manifests)
- .stopOnFailure()
- .throwFailureWhenFinished()
- .executeWith(ThreadPools.getWorkerPool())
- .run(manifest -> statsByManifest.add(collectStats(table, manifest,
partitionType)));
+ return collectStats(table, snapshot, file -> true, false /* incremental
*/).values();
+ }
- return mergeStats(statsByManifest, table.specs());
+ /**
+ * Incrementally computes the partition stats after the given snapshot to
current snapshot.
+ *
+ * @param table the table for which partition stats to be computed.
+ * @param fromSnapshot the snapshot after which partition stats is computed
(exclusive).
+ * @param currentSnapshot the snapshot till which partition stats is
computed (inclusive).
+ * @return the {@link PartitionMap} of {@link PartitionStats}
+ */
+ public static PartitionMap<PartitionStats> computeStatsIncremental(
+ Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) {
Review Comment:
nit: the name `currentSnapshot` might be misleading. Since we have a
`fromSnapshot` can this be a `toSnapshot` or `targetSnapshot` or something
similar?
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +142,114 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
- if (stats.isEmpty()) {
+ StructType partitionType = Partitioning.partitionType(table);
+
+ PartitionMap<PartitionStats> resultStatsMap;
+ if (recompute) {
+ resultStatsMap = PartitionStatsUtil.computeStatsFull(table, snapshot);
+ } else {
+ resultStatsMap = incrementalComputeAndMerge(table, snapshot,
partitionType);
+ }
+
+ if (resultStatsMap.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
- List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
+ List<PartitionStats> sortedStats =
+ PartitionStatsUtil.sortStats(resultStatsMap.values(), partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static PartitionMap<PartitionStats> incrementalComputeAndMerge(
+ Table table, Snapshot snapshot, StructType partitionType) throws
IOException {
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ throw new RuntimeException(
Review Comment:
I'm very late to this conversation, sorry about that :) I think we should
talk a bit about how a user would use these APIs to compute stats and then we
might be able to sort this disagreement out too.
I see [the other PR](https://github.com/apache/iceberg/pull/12451) to
introduce a `compute_partition_stats` stored proc for the "full-compute" path.
I'd assume there would be another proc, `incremental_compute_partition_stats`
or similar that will execute the "incremental-compute" path. If my assumption
is correct, I think the question is why would a user decide to call one instead
of the other. The expectation here is that the "full-compute" path is more
expensive than the "incremental-compute" path. So if the users motivation is to
run this cheaper operation then falling back to the full compute could be
misleading.
Or from a different angle: if the incremental path is expected to try first
with the cheap computation and then fall back to the more expensive one, then
what would be the point of having the `compute_partition_stats` procedure to
execute "full-compute". Why would one call it? So in general I'm in favour of
throwing an exception if the incremental computation is not feasible.
Well, if the plan is to have that single Spark procedure for both
approaches, then everything I wrote above is irrelevant :)
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
+ StructType partitionType = Partitioning.partitionType(table);
+
+ Collection<PartitionStats> stats;
+ if (recompute) {
+ stats = PartitionStatsUtil.computeStats(table, snapshot);
+ } else {
+ stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+ }
+
if (stats.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static Collection<PartitionStats> incrementalComputeAndMerge(
+ Table table, Snapshot snapshot, StructType partitionType) throws
IOException {
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ throw new RuntimeException(
+ "Previous stats not found for incremental compute. Try full
compute");
+ }
+
+ PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as
GenericRecord
+ try (CloseableIterable<PartitionStats> oldStats =
+ readPartitionStatsFile(schema(partitionType),
Files.localInput(statisticsFile.path()))) {
+ oldStats.forEach(
+ partitionStats ->
+ statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
+ }
+
+ // incrementally compute the new stats, partition field will be written as
PartitionData
+ PartitionMap<PartitionStats> incrementalStatsMap =
+ PartitionStatsUtil.computeStatsIncremental(
+ table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+ // convert PartitionData into GenericRecord and merge stats
+ incrementalStatsMap.forEach(
+ (key, value) ->
+ statsMap.merge(
+ Pair.of(key.first(), partitionDataToRecord((PartitionData)
key.second())),
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
+
+ return statsMap.values();
+ }
+
+ private static GenericRecord partitionDataToRecord(PartitionData data) {
+ GenericRecord record = GenericRecord.create(data.getPartitionType());
+ for (int index = 0; index < record.size(); index++) {
+ record.set(index, data.get(index));
+ }
+
+ return record;
+ }
+
+ @VisibleForTesting
+ static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId)
{
+ List<PartitionStatisticsFile> partitionStatisticsFiles =
table.partitionStatisticsFiles();
+ if (partitionStatisticsFiles.isEmpty()) {
+ return null;
Review Comment:
Not sure if this was discussed, but I think returning `null` here is not
consistent with throwing an exception at L249. I'd prefer throwing an exception
here too. On the callsite we anyway throw an exception.
##########
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -135,20 +143,113 @@ public static PartitionStatisticsFile
computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
throws IOException {
+ return computeAndWrite(table, snapshotId, true /* recompute */);
+ }
+
+ /**
+ * Incrementally computes the stats after the snapshot that has partition
stats file till the
+ * given snapshot and writes the combined result into a {@link
PartitionStatisticsFile} after
+ * merging the stats.
+ *
+ * @param table The {@link Table} for which the partition statistics is
computed.
+ * @param snapshotId snapshot for which partition statistics are computed.
+ * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
+ * present.
+ */
+ public static PartitionStatisticsFile computeAndWriteStatsFileIncremental(
+ Table table, long snapshotId) throws IOException {
+ return computeAndWrite(table, snapshotId, false /* recompute */);
+ }
+
+ private static PartitionStatisticsFile computeAndWrite(
+ Table table, long snapshotId, boolean recompute) throws IOException {
+ Preconditions.checkArgument(table != null, "Table cannot be null");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
+ StructType partitionType = Partitioning.partitionType(table);
+
+ Collection<PartitionStats> stats;
+ if (recompute) {
+ stats = PartitionStatsUtil.computeStats(table, snapshot);
+ } else {
+ stats = incrementalComputeAndMerge(table, snapshot, partitionType);
+ }
+
if (stats.isEmpty()) {
return null;
}
- StructType partitionType = Partitioning.partitionType(table);
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
}
+ private static Collection<PartitionStats> incrementalComputeAndMerge(
+ Table table, Snapshot snapshot, StructType partitionType) throws
IOException {
+ PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
+ if (statisticsFile == null) {
+ throw new RuntimeException(
+ "Previous stats not found for incremental compute. Try full
compute");
+ }
+
+ PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
+ // read previous stats, note that partition field will be read as
GenericRecord
+ try (CloseableIterable<PartitionStats> oldStats =
+ readPartitionStatsFile(schema(partitionType),
Files.localInput(statisticsFile.path()))) {
+ oldStats.forEach(
+ partitionStats ->
+ statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
+ }
+
+ // incrementally compute the new stats, partition field will be written as
PartitionData
+ PartitionMap<PartitionStats> incrementalStatsMap =
+ PartitionStatsUtil.computeStatsIncremental(
+ table, table.snapshot(statisticsFile.snapshotId()), snapshot);
+
+ // convert PartitionData into GenericRecord and merge stats
+ incrementalStatsMap.forEach(
+ (key, value) ->
+ statsMap.merge(
+ Pair.of(key.first(), partitionDataToRecord((PartitionData)
key.second())),
+ value,
+ (existingEntry, newEntry) -> {
+ existingEntry.appendStats(newEntry);
+ return existingEntry;
+ }));
+
+ return statsMap.values();
+ }
+
+ private static GenericRecord partitionDataToRecord(PartitionData data) {
+ GenericRecord record = GenericRecord.create(data.getPartitionType());
+ for (int index = 0; index < record.size(); index++) {
+ record.set(index, data.get(index));
+ }
+
+ return record;
+ }
+
+ @VisibleForTesting
+ static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId)
{
+ List<PartitionStatisticsFile> partitionStatisticsFiles =
table.partitionStatisticsFiles();
+ if (partitionStatisticsFiles.isEmpty()) {
+ return null;
+ }
+
+ Map<Long, PartitionStatisticsFile> stats =
+ partitionStatisticsFiles.stream()
+ .collect(Collectors.toMap(PartitionStatisticsFile::snapshotId,
file -> file));
+ for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId,
table::snapshot)) {
Review Comment:
Thunking out loud here: What if we have a long list of snapshots and only
the very first one has stats? Then we practically do an almost full recompute
of stats in case we execute this for the latest one. Isn't it misleading in
terms of expected performance? I mean if the users decised to run the
incremental computation, they'd expect it to be cheaper than the full
computation but in this case it isn't. I know, this is an edge case, not sure
if we should do anything about it.
Maybe having a threshold config for the number of ancestors we'd want to
visit? Just asking if it makes sense at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]