rdblue commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1103895399


##########
api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java:
##########
@@ -44,4 +66,100 @@ public Type type() {
       return term().type();
     }
   }
+
+  public String columnName() {
+    if (op() == Operation.COUNT_STAR) {
+      return "*";
+    } else {
+      return ref().name();
+    }
+  }
+
+  public String describe() {
+    switch (op()) {
+      case COUNT_STAR:
+        return "count(*)";
+      case COUNT:
+        return "count(" + ExpressionUtil.describe(term()) + ")";
+      case MAX:
+        return "max(" + ExpressionUtil.describe(term()) + ")";
+      case MIN:
+        return "min(" + ExpressionUtil.describe(term()) + ")";
+      default:
+        throw new UnsupportedOperationException("Unsupported aggregate type: " 
+ op());
+    }
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key) {
+    return safeGet(map, key, null);
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key, V defaultValue) {
+    if (map != null) {
+      return map.getOrDefault(key, defaultValue);
+    }
+
+    return null;
+  }
+
+  interface Aggregator<R> {
+    void update(StructLike struct);
+
+    void update(DataFile file);
+
+    R result();
+
+    boolean hasValue();
+  }
+
+  abstract static class NullSafeAggregator<T, R> implements Aggregator<R> {
+    private final BoundAggregate<T, R> aggregate;
+    private boolean hasValue = true;
+
+    NullSafeAggregator(BoundAggregate<T, R> aggregate) {
+      this.aggregate = aggregate;
+    }
+
+    protected abstract void update(R value);
+
+    protected abstract R current();
+
+    @Override
+    public void update(StructLike struct) {
+      if (hasValue) {
+        R value = aggregate.eval(struct);
+        if (aggregate.canPushDown) {
+          update(value);
+        } else {
+          this.hasValue = false;
+        }
+      }
+    }
+
+    @Override
+    public void update(DataFile file) {
+      if (hasValue) {
+        R value = aggregate.eval(file);

Review Comment:
   Rather than setting `canPushDown` in the aggregator, I think that this needs 
to distinguish between a `null` value and a missing value. To do that, there 
are a few options, but I think the cleanest is to add a `hasValue(DataFile 
file)` method to check before calling `eval`. If that returns false, then 
`hasValue` is set to `false` and no more aggregation is done.
   
   In addition, this method should only call `update` when values are non-null. 
Aggregation skips null values for count, min, and max, and count-star will 
return the number of rows in a file and is always non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to