rdblue commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1097930735


##########
api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java:
##########
@@ -44,4 +57,85 @@ public Type type() {
       return term().type();
     }
   }
+
+  public String columnName() {
+    if (op() == Operation.COUNT_STAR) {
+      return "*";
+    } else {
+      return ref().name();
+    }
+  }
+
+  public String describe() {
+    switch (op()) {
+      case COUNT_STAR:
+        return "count(*)";
+      case COUNT:
+        return "count(" + ExpressionUtil.describe(term()) + ")";
+      case MAX:
+        return "max(" + ExpressionUtil.describe(term()) + ")";
+      case MIN:
+        return "min(" + ExpressionUtil.describe(term()) + ")";
+      default:
+        throw new UnsupportedOperationException("Unsupported aggregate type: " 
+ op());
+    }
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key) {
+    return safeGet(map, key, null);
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key, V defaultValue) {
+    if (map != null) {
+      return map.getOrDefault(key, defaultValue);
+    }
+
+    return null;
+  }
+
+  interface Aggregator<R> {
+    void update(StructLike struct);
+
+    void update(DataFile file);
+
+    R result();
+  }
+
+  abstract static class NullSafeAggregator<T, R> implements Aggregator<R> {
+    private final BoundAggregate<T, R> aggregate;
+    private boolean isNull = false;
+
+    NullSafeAggregator(BoundAggregate<T, R> aggregate) {
+      this.aggregate = aggregate;
+    }
+
+    protected abstract void update(R value);
+
+    protected abstract R current();
+
+    @Override
+    public void update(StructLike struct) {
+      if (!isNull) {
+        R value = aggregate.eval(struct);
+        update(value);
+      }
+    }
+
+    @Override
+    public void update(DataFile file) {
+      if (!isNull) {
+        R value = aggregate.eval(file);
+        update(value);

Review Comment:
   I see. In that case, I think we need to change `isNull` to `hasValue` and 
return a boolean from `update(R)`.
   
   The intent here was to signal when there is not enough information to 
produce a value. When there isn't, then the result value should be `null`, and 
we can skip pulling values out of rows or data files because we don't have 
enough information.
   
   For example, if we are processing 3 Parquet files and 1 Avro file, the Avro 
file may not have a max value. Rather than giving a partial max from the 3 
Parquet files, we need `update(avroFile)` to return `hasValue = false` so that 
we stop aggregating.
   
   You're right that this needs to change from my original version, which 
assumed any null value signaled that there was no maximum. If we know that a 
file contains only null values, then we can skip it even if it doesn't have an 
upper bound. Similarly, if we get a null value from a row then we can skip it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to