agavra commented on code in PR #10120:
URL: https://github.com/apache/pinot/pull/10120#discussion_r1072421262


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -211,55 +221,103 @@ private static Boolean mergeBoolOr(Object left, Object 
right) {
     return ((Boolean) left) || ((Boolean) right);
   }
 
-  private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
-    Object[] keyElements = new Object[groupSet.size()];
-    for (int i = 0; i < groupSet.size(); i++) {
-      keyElements[i] = row[((RexExpression.InputRef) 
groupSet.get(i)).getIndex()];
+  // NOTE: the below two classes are needed depending on where the
+  // fourth moment is being executed - if the leaf stage gets a
+  // fourth moment pushed down to it, it will return a PinotFourthMoment
+  // as the result of the aggregation. If it is not possible (e.g. the
+  // input to the aggregate requires the result of a JOIN - such as
+  // FOURTHMOMENT(t1.a + t2.a)) then the input to the aggregate in the
+  // intermediate stage is a numeric.
+
+  private static class MergeFourthMomentNumeric implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      ((PinotFourthMoment) left).increment(((Number) right).doubleValue());
+      return left;
+    }
+
+    @Override
+    public Object initialize(Object other) {
+      PinotFourthMoment moment = new PinotFourthMoment();
+      moment.increment(((Number) other).doubleValue());
+      return moment;
     }
-    return new Key(keyElements);
   }
 
-  interface Merger extends BiFunction<Object, Object, Object> {
+  private static class MergeFourthMomentObject implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      PinotFourthMoment agg = (PinotFourthMoment) left;
+      agg.combine((PinotFourthMoment) right);
+      return agg;
+    }
+  }
+
+  interface Merger {
+    /**
+     * Initializes the merger based on the first input
+     */
+    default Object initialize(Object other) {
+      return other;
+    }
+
+    /**
+     * Merges the existing aggregate (the result of {@link 
#initialize(Object)}) with
+     * the new value coming in (which may be an aggregate in and of itself).
+     */
+    Object merge(Object agg, Object value);
   }
 
   private static class Accumulator {
 
-    private static final Map<String, Merger> MERGERS = ImmutableMap
-        .<String, Merger>builder()
-        .put("SUM", AggregateOperator::mergeSum)
-        .put("$SUM", AggregateOperator::mergeSum)
-        .put("$SUM0", AggregateOperator::mergeSum)
-        .put("MIN", AggregateOperator::mergeMin)
-        .put("$MIN", AggregateOperator::mergeMin)
-        .put("$MIN0", AggregateOperator::mergeMin)
-        .put("MAX", AggregateOperator::mergeMax)
-        .put("$MAX", AggregateOperator::mergeMax)
-        .put("$MAX0", AggregateOperator::mergeMax)
-        .put("COUNT", AggregateOperator::mergeCount)
-        .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
-        .put("BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
+    private static final Map<String, Function<DataSchema.ColumnDataType, 
Merger>> MERGERS = ImmutableMap
+        .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
+        .put("SUM", cdt -> AggregateOperator::mergeSum)

Review Comment:
   column data type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to