rdblue commented on code in PR #6622: URL: https://github.com/apache/iceberg/pull/6622#discussion_r1097930735
########## api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java: ########## @@ -44,4 +57,85 @@ public Type type() { return term().type(); } } + + public String columnName() { + if (op() == Operation.COUNT_STAR) { + return "*"; + } else { + return ref().name(); + } + } + + public String describe() { + switch (op()) { + case COUNT_STAR: + return "count(*)"; + case COUNT: + return "count(" + ExpressionUtil.describe(term()) + ")"; + case MAX: + return "max(" + ExpressionUtil.describe(term()) + ")"; + case MIN: + return "min(" + ExpressionUtil.describe(term()) + ")"; + default: + throw new UnsupportedOperationException("Unsupported aggregate type: " + op()); + } + } + + <V> V safeGet(Map<Integer, V> map, int key) { + return safeGet(map, key, null); + } + + <V> V safeGet(Map<Integer, V> map, int key, V defaultValue) { + if (map != null) { + return map.getOrDefault(key, defaultValue); + } + + return null; + } + + interface Aggregator<R> { + void update(StructLike struct); + + void update(DataFile file); + + R result(); + } + + abstract static class NullSafeAggregator<T, R> implements Aggregator<R> { + private final BoundAggregate<T, R> aggregate; + private boolean isNull = false; + + NullSafeAggregator(BoundAggregate<T, R> aggregate) { + this.aggregate = aggregate; + } + + protected abstract void update(R value); + + protected abstract R current(); + + @Override + public void update(StructLike struct) { + if (!isNull) { + R value = aggregate.eval(struct); + update(value); + } + } + + @Override + public void update(DataFile file) { + if (!isNull) { + R value = aggregate.eval(file); + update(value); Review Comment: I see. In that case, I think we need to change `isNull` to `hasValue` and return a boolean from `update(R)`. The intent here was to signal when there is not enough information to produce a value. When there isn't, then the result value should be `null`, and we can skip pulling values out of rows or data files because we don't have enough information. For example, if we are processing 3 Parquet files and 1 Avro file, the Avro file may not have a max value. Rather than giving a partial max from the 3 Parquet files, we need `update(avroFile)` to return `hasValue = false` so that we stop aggregating. You're right that this needs to change from my original version, which assumed any null value signaled that there was no maximum. If we know that a file contains only null values, then we can skip it even if it doesn't have an upper bound. Similarly, if we get a null value from a row then we can skip it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org