walterddr commented on code in PR #10120:
URL: https://github.com/apache/pinot/pull/10120#discussion_r1072360941


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -250,6 +252,9 @@ public static Object[] extractRowFromDataBlock(DataBlock 
dataBlock, int rowId, D
           case TIMESTAMP_ARRAY:
             row[colId] = 
DataSchema.ColumnDataType.TIMESTAMP_ARRAY.convert(dataBlock.getLongArray(rowId, 
colId));
             break;
+          case OBJECT:
+            row[colId] = 
customObjectSerde.apply(dataBlock.getCustomObject(rowId, colId));

Review Comment:
   i felt like we dont need to generic the customObjectSerde as argument, since 
they are actually used together all the time. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java:
##########
@@ -159,6 +161,9 @@ public Double extractFinalResult(PinotFourthMoment m4) {
         return m4.kurtosis();
       case SKEWNESS:
         return m4.skew();
+      case MOMENT:
+        // this should never happen, as we're not extracting
+        // final result when using this method

Review Comment:
   let's still throw a proper exception here?
   ```suggestion
           throw new UnsupportedOperatorException("Fourth moment cannot be used 
as aggregation function directly", e);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -211,55 +221,103 @@ private static Boolean mergeBoolOr(Object left, Object 
right) {
     return ((Boolean) left) || ((Boolean) right);
   }
 
-  private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
-    Object[] keyElements = new Object[groupSet.size()];
-    for (int i = 0; i < groupSet.size(); i++) {
-      keyElements[i] = row[((RexExpression.InputRef) 
groupSet.get(i)).getIndex()];
+  // NOTE: the below two classes are needed depending on where the
+  // fourth moment is being executed - if the leaf stage gets a
+  // fourth moment pushed down to it, it will return a PinotFourthMoment
+  // as the result of the aggregation. If it is not possible (e.g. the
+  // input to the aggregate requires the result of a JOIN - such as
+  // FOURTHMOMENT(t1.a + t2.a)) then the input to the aggregate in the
+  // intermediate stage is a numeric.
+
+  private static class MergeFourthMomentNumeric implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      ((PinotFourthMoment) left).increment(((Number) right).doubleValue());
+      return left;
+    }
+
+    @Override
+    public Object initialize(Object other) {
+      PinotFourthMoment moment = new PinotFourthMoment();
+      moment.increment(((Number) other).doubleValue());
+      return moment;
     }
-    return new Key(keyElements);
   }
 
-  interface Merger extends BiFunction<Object, Object, Object> {
+  private static class MergeFourthMomentObject implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      PinotFourthMoment agg = (PinotFourthMoment) left;
+      agg.combine((PinotFourthMoment) right);
+      return agg;
+    }
+  }
+
+  interface Merger {
+    /**
+     * Initializes the merger based on the first input
+     */
+    default Object initialize(Object other) {
+      return other;
+    }
+
+    /**
+     * Merges the existing aggregate (the result of {@link 
#initialize(Object)}) with
+     * the new value coming in (which may be an aggregate in and of itself).
+     */
+    Object merge(Object agg, Object value);
   }
 
   private static class Accumulator {
 
-    private static final Map<String, Merger> MERGERS = ImmutableMap
-        .<String, Merger>builder()
-        .put("SUM", AggregateOperator::mergeSum)
-        .put("$SUM", AggregateOperator::mergeSum)
-        .put("$SUM0", AggregateOperator::mergeSum)
-        .put("MIN", AggregateOperator::mergeMin)
-        .put("$MIN", AggregateOperator::mergeMin)
-        .put("$MIN0", AggregateOperator::mergeMin)
-        .put("MAX", AggregateOperator::mergeMax)
-        .put("$MAX", AggregateOperator::mergeMax)
-        .put("$MAX0", AggregateOperator::mergeMax)
-        .put("COUNT", AggregateOperator::mergeCount)
-        .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
-        .put("BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
+    private static final Map<String, Function<DataSchema.ColumnDataType, 
Merger>> MERGERS = ImmutableMap
+        .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
+        .put("SUM", cdt -> AggregateOperator::mergeSum)

Review Comment:
   qq: what's `cdt` meant?



##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java:
##########
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.PinotFourthMomentAggregateFunction;
+import org.apache.calcite.sql.fun.PinotKurtosisAggregateFunction;
+import org.apache.calcite.sql.fun.PinotOperatorTable;
+import org.apache.calcite.sql.fun.PinotSkewnessAggregateFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+
+
+/**
+ * This rule rewrites aggregate functions when necessary for Pinot's
+ * multistage engine. For example, SKEWNESS must be rewritten into two
+ * parts: a multi-stage FOURTH_MOMENT calculation and then a scalar function
+ * that reduces the moment into the skewness at the end. This is to ensure
+ * that the aggregation computation can merge partial results from different
+ * intermediate nodes before reducing it into the final result.
+ *
+ * Also see {@link AggregateReduceFunctionsRule}, as this implementation
+ * closely follows that.

Review Comment:
   nit
   ```suggestion
    * This implementation follows closely with Calcite's 
AggregateReduceFunctionsRule.
    * @see org.apache.calcite.rel.rules.AggregateReduceFunctionsRule
   ```



##########
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java:
##########
@@ -46,8 +50,15 @@ public class PinotOperatorTable extends SqlStdOperatorTable {
   private static @MonotonicNonNull PinotOperatorTable _instance;
 
   public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction();
+  public static final SqlFunction SKEWNESS_REDUCE = new 
SqlFunction("SKEWNESS_REDUCE", SqlKind.OTHER_FUNCTION,
+      ReturnTypes.DOUBLE, null, OperandTypes.BINARY, 
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+  public static final SqlFunction KURTOSIS_REDUCE = new 
SqlFunction("KURTOSIS_REDUCE", SqlKind.OTHER_FUNCTION,
+      ReturnTypes.DOUBLE, null, OperandTypes.BINARY, 
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+
   public static final SqlAggFunction BOOL_AND = new 
PinotBoolAndAggregateFunction();
   public static final SqlAggFunction BOOL_OR = new 
PinotBoolOrAggregateFunction();
+  public static final SqlAggFunction SKEWNESS = 
PinotSkewnessAggregateFunction.INSTANCE;
+  public static final SqlAggFunction KURTOSIS = 
PinotKurtosisAggregateFunction.INSTANCE;

Review Comment:
   nit: unrelated to this PR but we can make rest of the operators to also 
create static instances. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -283,7 +283,11 @@ private static Object[] canonicalizeRow(Object[] row, 
DataSchema dataSchema) {
     for (int colId = 0; colId < row.length; colId++) {
       Object value = row[colId];
       if (value != null) {
-        resultRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+        if (dataSchema.getColumnDataType(colId) == 
DataSchema.ColumnDataType.OBJECT) {
+          resultRow[colId] = value;
+        } else {
+          resultRow[colId] = 
dataSchema.getColumnDataType(colId).convert(value);
+        }

Review Comment:
   why do we need the additional if check here? i thought dataSchema.convert 
has already handled OBJECT?



##########
pinot-query-runtime/src/test/resources/queries/Skew.json:
##########
@@ -0,0 +1,80 @@
+{
+  "skew": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "groupingCol", "type": "STRING"},
+          {"name": "partitionCol", "type": "STRING"},
+          {"name": "val", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", "key1", 1],
+          ["a", "key2", 2],
+          ["a", "key3", 3],
+          ["a", "key1", 4],
+          ["a", "key2", 4],
+          ["a", "key3", 4],
+          ["a", "key1", 7],
+          ["a", "key2", 9],
+          ["b", "key3", 1],
+          ["b", "key1", 2],
+          ["b", "key2", 3],
+          ["b", "key3", 4],
+          ["b", "key1", 4],
+          ["b", "key2", 4],
+          ["b", "key3", 7],
+          ["b", "key1", 9]
+        ],
+        "partitionColumns": [
+          "partitionCol"
+        ]
+      },
+      "tbl2": {
+        "schema": [
+          {"name": "groupingCol", "type": "STRING"},
+          {"name": "partitionCol", "type": "STRING"},
+          {"name": "val", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", "key1", 1],
+          ["a", "key2", 2],
+          ["a", "key3", 3],
+          ["a", "key1", 4],
+          ["a", "key2", 4],
+          ["a", "key3", 4],
+          ["a", "key1", 7],
+          ["a", "key2", 9],
+          ["b", "key3", 1],
+          ["b", "key1", 2],
+          ["b", "key2", 3],
+          ["b", "key3", 4],
+          ["b", "key1", 4],
+          ["b", "key2", 4],
+          ["b", "key3", 7],
+          ["b", "key1", 9]
+        ],
+        "partitionColumns": [
+          "partitionCol"
+        ]
+      }
+    },
+    "queries": [
+      {
+        "ignored": true,

Review Comment:
   unignored this one plz



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to