This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1231a2c985 [multistage] support aggregations that require intermediate 
representations (#10120)
1231a2c985 is described below

commit 1231a2c9852d9c22b1ae7268aab69357347b24b6
Author: Almog Gavra <almog.ga...@gmail.com>
AuthorDate: Wed Jan 18 17:11:21 2023 -0800

    [multistage] support aggregations that require intermediate representations 
(#10120)
    
    * [multistage] support aggregations that require intermediate 
representations
    
    * move CustomObject to top level class
    
    * fix case when interemediate stage was first to aggregate
    
    * address rongs comments
    
    * address feedback
    
    * move InternalReduceFunctions to new package
---
 .../java/org/apache/pinot/common/CustomObject.java |  30 +--
 .../pinot/common/datablock/BaseDataBlock.java      |  15 ++
 .../apache/pinot/common/datablock/DataBlock.java   |   3 +
 .../pinot/common/datablock/DataBlockUtils.java     |  13 +-
 .../pinot/common/datatable/BaseDataTable.java      |   1 +
 .../apache/pinot/common/datatable/DataTable.java   |  22 +--
 .../pinot/common/datatable/DataTableImplV4.java    |   1 +
 .../org/apache/pinot/common/utils/DataSchema.java  |   2 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |   4 +-
 .../core/common/datablock/DataBlockBuilder.java    |   4 +-
 .../common/datatable/BaseDataTableBuilder.java     |   4 +-
 .../function/AggregationFunctionFactory.java       |   2 +
 .../function/AggregationFunctionUtils.java         |   3 +-
 .../function/FourthMomentAggregationFunction.java  |   8 +-
 .../query/reduce/DistinctDataTableReducer.java     |   3 +-
 .../core/query/reduce/GroupByDataTableReducer.java |   3 +-
 .../reduce/function/InternalReduceFunctions.java   |  32 ++--
 .../core/common/datatable/DataTableSerDeTest.java  |   3 +-
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   1 +
 .../rules/PinotReduceAggregateFunctionsRule.java   | 201 +++++++++++++++++++++
 .../sql/fun/PinotBoolAndAggregateFunction.java     |   4 +-
 .../sql/fun/PinotBoolOrAggregateFunction.java      |   4 +-
 ...ava => PinotFourthMomentAggregateFunction.java} |  11 +-
 ...on.java => PinotKurtosisAggregateFunction.java} |  11 +-
 .../apache/calcite/sql/fun/PinotOperatorTable.java |  15 +-
 ...on.java => PinotSkewnessAggregateFunction.java} |  11 +-
 .../query/planner/logical/RelToStageConverter.java |   8 +-
 .../query/runtime/blocks/TransferableBlock.java    |   3 +-
 .../query/runtime/operator/AggregateOperator.java  | 124 +++++++++----
 .../LeafStageTransferableBlockOperator.java        |   6 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   2 +-
 .../pinot/query/service/QueryDispatcher.java       |   3 +-
 .../runtime/operator/AggregateOperatorTest.java    |  30 +--
 .../src/test/resources/queries/Skew.json           |  86 +++++++++
 .../pinot/segment/spi/AggregationFunctionType.java |   1 +
 35 files changed, 548 insertions(+), 126 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
 b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
similarity index 56%
copy from 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
copy to pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
index 3d336c1070..355fff08db 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java
@@ -17,21 +17,27 @@
  * under the License.
  */
 
-package org.apache.calcite.sql.fun;
+package org.apache.pinot.common;
 
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.util.Optionality;
+import java.nio.ByteBuffer;
 
 
-public class PinotBoolOrAggregateFunction extends SqlAggFunction {
+public class CustomObject {
+  public static final int NULL_TYPE_VALUE = 100;
 
-  public PinotBoolOrAggregateFunction() {
-    super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
-        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
-        false, false, Optionality.FORBIDDEN);
+  private final int _type;
+  private final ByteBuffer _buffer;
+
+  public CustomObject(int type, ByteBuffer buffer) {
+    _type = type;
+    _buffer = buffer;
+  }
+
+  public int getType() {
+    return _type;
+  }
+
+  public ByteBuffer getBuffer() {
+    return _buffer;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index ebbb6e7a97..27f0b5f732 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTableImplV3;
 import org.apache.pinot.common.datatable.DataTableUtils;
 import org.apache.pinot.common.response.ProcessingException;
@@ -345,6 +346,20 @@ public abstract class BaseDataBlock implements DataBlock {
     return strings;
   }
 
+  @Nullable
+  @Override
+  public CustomObject getCustomObject(int rowId, int colId) {
+    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
+    int type = _variableSizeData.getInt();
+    if (size == 0) {
+      assert type == CustomObject.NULL_TYPE_VALUE;
+      return null;
+    }
+    ByteBuffer buffer = _variableSizeData.slice();
+    buffer.limit(size);
+    return new CustomObject(type, buffer);
+  }
+
   @Nullable
   @Override
   public RoaringBitmap getNullRowIds(int colId) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
index ed8d40760a..418426b4ac 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -74,6 +75,8 @@ public interface DataBlock {
 
   String[] getStringArray(int rowId, int colId);
 
+  CustomObject getCustomObject(int rowId, int colId);
+
   @Nullable
   RoaringBitmap getNullRowIds(int colId);
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 8b41969e8e..cd9a729c8b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -87,14 +89,14 @@ public final class DataBlockUtils {
     }
   }
 
-  public static List<Object[]> extractRows(DataBlock dataBlock) {
+  public static List<Object[]> extractRows(DataBlock dataBlock, 
Function<CustomObject, Object> customObjectSerde) {
     DataSchema dataSchema = dataBlock.getDataSchema();
     DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
     RoaringBitmap[] nullBitmaps = extractNullBitmaps(dataBlock);
     int numRows = dataBlock.getNumberOfRows();
     List<Object[]> rows = new ArrayList<>(numRows);
     for (int rowId = 0; rowId < numRows; rowId++) {
-      rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes, 
nullBitmaps));
+      rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes, 
nullBitmaps, customObjectSerde));
     }
     return rows;
   }
@@ -189,8 +191,8 @@ public final class DataBlockUtils {
     return nullBitmaps;
   }
 
-  public static Object[] extractRowFromDataBlock(DataBlock dataBlock, int 
rowId, DataSchema.ColumnDataType[] dataTypes,
-      RoaringBitmap[] nullBitmaps) {
+  private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int 
rowId, DataSchema.ColumnDataType[] dataTypes,
+      RoaringBitmap[] nullBitmaps, Function<CustomObject, Object> 
customObjectSerde) {
     int numColumns = nullBitmaps.length;
     Object[] row = new Object[numColumns];
     for (int colId = 0; colId < numColumns; colId++) {
@@ -250,6 +252,9 @@ public final class DataBlockUtils {
           case TIMESTAMP_ARRAY:
             row[colId] = 
DataSchema.ColumnDataType.TIMESTAMP_ARRAY.convert(dataBlock.getLongArray(rowId, 
colId));
             break;
+          case OBJECT:
+            row[colId] = 
customObjectSerde.apply(dataBlock.getCustomObject(rowId, colId));
+            break;
           default:
             throw new IllegalStateException(
                 String.format("Unsupported data type: %s for column: %s", 
dataTypes[colId], colId));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
index d4e493589a..06ba4b34f1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 5a5f323c93..9bac9706a7 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -21,10 +21,10 @@ package org.apache.pinot.common.datatable;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -89,26 +89,6 @@ public interface DataTable {
 
   DataTable toDataOnlyDataTable();
 
-  class CustomObject {
-    public static final int NULL_TYPE_VALUE = 100;
-
-    private final int _type;
-    private final ByteBuffer _buffer;
-
-    public CustomObject(int type, ByteBuffer buffer) {
-      _type = type;
-      _buffer = buffer;
-    }
-
-    public int getType() {
-      return _type;
-    }
-
-    public ByteBuffer getBuffer() {
-      return _buffer;
-    }
-  }
-
   enum MetadataValueType {
     INT, LONG, STRING
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index 78fa5606b6..d4d27634f9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 9c7b985bfd..4854f5bf22 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -422,6 +422,8 @@ public class DataSchema {
           return toTimestampArray(value);
         case BYTES_ARRAY:
           return (byte[][]) value;
+        case OBJECT:
+          return (Serializable) value;
         default:
           throw new IllegalStateException(String.format("Cannot convert: '%s' 
to type: %s", value, this));
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index fbfee474e9..a01f02a5c9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -59,7 +59,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -1244,7 +1244,7 @@ public class ObjectSerDeUtils {
     return SER_DES[objectTypeValue].serialize(value);
   }
 
-  public static <T> T deserialize(DataTable.CustomObject customObject) {
+  public static <T> T deserialize(CustomObject customObject) {
     return (T) 
SER_DES[customObject.getType()].deserialize(customObject.getBuffer());
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 19ed9cd05a..573b3beadf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -28,11 +28,11 @@ import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -524,7 +524,7 @@ public class DataBlockBuilder {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
     if (value == null) {
       byteBuffer.putInt(0);
-      
builder._variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
+      
builder._variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE);
     } else {
       int objectTypeValue = 
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
index 9547924ac5..c0a8ff8ea1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTableUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -103,7 +103,7 @@ public abstract class BaseDataTableBuilder implements 
DataTableBuilder {
     
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
     if (value == null) {
       _currentRowDataByteBuffer.putInt(0);
-      
_variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
+      _variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE);
     } else {
       int objectTypeValue = 
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 9b571ff3c4..e7d1cba0e4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -297,6 +297,8 @@ public class AggregationFunctionFactory {
             return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.SKEWNESS);
           case KURTOSIS:
             return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.KURTOSIS);
+          case FOURTHMOMENT:
+            return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.MOMENT);
           default:
             throw new IllegalArgumentException();
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 6b1dd21e3c..89e9d3d8e5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
@@ -142,7 +143,7 @@ public class AggregationFunctionUtils {
       case DOUBLE:
         return dataTable.getDouble(rowId, colId);
       case OBJECT:
-        DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, 
colId);
+        CustomObject customObject = dataTable.getCustomObject(rowId, colId);
         return customObject != null ? 
ObjectSerDeUtils.deserialize(customObject) : null;
       default:
         throw new IllegalStateException("Illegal column data type in 
intermediate result: " + columnDataType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
index 9cb06e4eeb..a6f4b707ff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
@@ -36,7 +36,7 @@ public class FourthMomentAggregationFunction extends 
BaseSingleInputAggregationF
   private final Type _type;
 
   enum Type {
-    KURTOSIS, SKEWNESS
+    KURTOSIS, SKEWNESS, MOMENT
   }
 
   public FourthMomentAggregationFunction(ExpressionContext expression, Type 
type) {
@@ -51,6 +51,8 @@ public class FourthMomentAggregationFunction extends 
BaseSingleInputAggregationF
         return AggregationFunctionType.KURTOSIS;
       case SKEWNESS:
         return AggregationFunctionType.SKEWNESS;
+      case MOMENT:
+        return AggregationFunctionType.FOURTHMOMENT;
       default:
         throw new IllegalArgumentException("Unexpected type " + _type);
     }
@@ -159,6 +161,10 @@ public class FourthMomentAggregationFunction extends 
BaseSingleInputAggregationF
         return m4.kurtosis();
       case SKEWNESS:
         return m4.skew();
+      case MOMENT:
+        // this should never happen, as we're not extracting
+        // final result when using this method
+        throw new UnsupportedOperationException("Fourth moment cannot be used 
as aggregation function directly");
       default:
         throw new IllegalStateException("Unexpected value: " + _type);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index af25afe24e..de65c18657 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -77,7 +78,7 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
       int numColumns = dataSchema.size();
       if (numColumns == 1 && dataSchema.getColumnDataType(0) == 
ColumnDataType.OBJECT) {
         // DistinctTable is still being returned as a single object
-        DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+        CustomObject customObject = dataTable.getCustomObject(0, 0);
         assert customObject != null;
         DistinctTable distinctTable = 
ObjectSerDeUtils.deserialize(customObject);
         if (!distinctTable.isEmpty()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 960cf1cb07..cb5975d02b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerGauge;
@@ -318,7 +319,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
                       break;
                     case OBJECT:
                       // TODO: Move ser/de into AggregationFunction interface
-                      DataTable.CustomObject customObject = 
dataTable.getCustomObject(rowId, colId);
+                      CustomObject customObject = 
dataTable.getCustomObject(rowId, colId);
                       if (customObject != null) {
                         values[colId] = 
ObjectSerDeUtils.deserialize(customObject);
                       }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
similarity index 54%
copy from 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
index 3610ce0c4d..e56e82298e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java
@@ -17,21 +17,29 @@
  * under the License.
  */
 
-package org.apache.calcite.sql.fun;
+package org.apache.pinot.core.query.reduce.function;
 
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.util.Optionality;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
+import org.apache.pinot.spi.annotations.ScalarFunction;
 
 
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+/**
+ * This class contains functions that are necessary for the multistage engine
+ * aggregations that need to be reduced after the initial aggregation to get
+ * the final result.
+ */
+public class InternalReduceFunctions {
+
+  private InternalReduceFunctions() {
+  }
+
+  @ScalarFunction
+  public static double skewnessReduce(PinotFourthMoment fourthMoment) {
+    return fourthMoment.skew();
+  }
 
-  public PinotBoolAndAggregateFunction() {
-    super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
-        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
-        false, false, Optionality.FORBIDDEN);
+  @ScalarFunction
+  public static double kurtosisReduce(PinotFourthMoment fourthMoment) {
+    return fourthMoment.kurtosis();
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 22b0e92630..9b53e87354 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.datatable.DataTableFactory;
@@ -734,7 +735,7 @@ public class DataTableSerDeTest {
                 ERROR_MESSAGE);
             break;
           case OBJECT:
-            DataTable.CustomObject customObject = 
newDataTable.getCustomObject(rowId, colId);
+            CustomObject customObject = newDataTable.getCustomObject(rowId, 
colId);
             if (isNull) {
               Assert.assertNull(customObject, ERROR_MESSAGE);
             } else {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 1828abd0e8..43ec25dfba 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -82,6 +82,7 @@ public class PinotQueryRuleSets {
           CoreRules.AGGREGATE_UNION_AGGREGATE,
 
           // reduce aggregate functions like AVG, STDDEV_POP etc.
+          PinotReduceAggregateFunctionsRule.INSTANCE,
           CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
 
           // remove unnecessary sort rule
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
new file mode 100644
index 0000000000..5c6fd18e86
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.PinotFourthMomentAggregateFunction;
+import org.apache.calcite.sql.fun.PinotKurtosisAggregateFunction;
+import org.apache.calcite.sql.fun.PinotOperatorTable;
+import org.apache.calcite.sql.fun.PinotSkewnessAggregateFunction;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+
+
+/**
+ * This rule rewrites aggregate functions when necessary for Pinot's
+ * multistage engine. For example, SKEWNESS must be rewritten into two
+ * parts: a multi-stage FOURTH_MOMENT calculation and then a scalar function
+ * that reduces the moment into the skewness at the end. This is to ensure
+ * that the aggregation computation can merge partial results from different
+ * intermediate nodes before reducing it into the final result.
+ *
+ * <p>This implementation follows closely with Calcite's
+ * {@link AggregateReduceFunctionsRule}.
+ */
+public class PinotReduceAggregateFunctionsRule extends RelOptRule {
+
+  public static final PinotReduceAggregateFunctionsRule INSTANCE =
+      new PinotReduceAggregateFunctionsRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+  private static final Set<String> FUNCTIONS = ImmutableSet.of(
+      PinotSkewnessAggregateFunction.SKEWNESS,
+      PinotKurtosisAggregateFunction.KURTOSIS
+  );
+
+  protected PinotReduceAggregateFunctionsRule(RelBuilderFactory factory) {
+    super(operand(Aggregate.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+
+    if (call.rel(0) instanceof Aggregate) {
+      Aggregate agg = call.rel(0);
+      for (AggregateCall aggCall : agg.getAggCallList()) {
+        if (canReduce(aggCall)) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Aggregate oldAggRel = call.rel(0);
+    reduceAggs(call, oldAggRel);
+  }
+
+  private void reduceAggs(RelOptRuleCall ruleCall, Aggregate oldAggRel) {
+    RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+
+    List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+    final int groupCount = oldAggRel.getGroupCount();
+
+    final List<AggregateCall> newCalls = new ArrayList<>();
+    final Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>();
+
+    final List<RexNode> projList = new ArrayList<>();
+
+    // pass through group key
+    for (int i = 0; i < groupCount; i++) {
+      projList.add(rexBuilder.makeInputRef(oldAggRel, i));
+    }
+
+    // List of input expressions. If a particular aggregate needs more, it
+    // will add an expression to the end, and we will create an extra project
+    final RelBuilder relBuilder = ruleCall.builder();
+    relBuilder.push(oldAggRel.getInput());
+    final List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields());
+
+    // create new aggregate function calls and rest of project list together
+    for (AggregateCall oldCall : oldCalls) {
+      projList.add(
+          reduceAgg(oldAggRel, oldCall, newCalls, aggCallMapping, inputExprs));
+    }
+
+    final int extraArgCount = inputExprs.size() - 
relBuilder.peek().getRowType().getFieldCount();
+    if (extraArgCount > 0) {
+      relBuilder.project(inputExprs,
+          CompositeList.of(
+              relBuilder.peek().getRowType().getFieldNames(),
+              Collections.nCopies(extraArgCount, null)));
+    }
+    newAggregateRel(relBuilder, oldAggRel, newCalls);
+    newCalcRel(relBuilder, oldAggRel.getRowType(), projList);
+    ruleCall.transformTo(relBuilder.build());
+  }
+
+  private RexNode reduceAgg(Aggregate oldAggRel, AggregateCall oldCall, 
List<AggregateCall> newCalls,
+      Map<AggregateCall, RexNode> aggCallMapping, List<RexNode> inputExprs) {
+    if (canReduce(oldCall)) {
+      switch (oldCall.getAggregation().getName()) {
+        case PinotSkewnessAggregateFunction.SKEWNESS:
+          return reduceFourthMoment(oldAggRel, oldCall, newCalls, 
aggCallMapping, false);
+        case PinotKurtosisAggregateFunction.KURTOSIS:
+          return reduceFourthMoment(oldAggRel, oldCall, newCalls, 
aggCallMapping, true);
+        default:
+          throw new IllegalStateException("Unexpected aggregation name " + 
oldCall.getAggregation().getName());
+      }
+    } else {
+      // anything else:  preserve original call
+      RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+      final int nGroups = oldAggRel.getGroupCount();
+      return rexBuilder.addAggCall(oldCall,
+          nGroups,
+          newCalls,
+          aggCallMapping,
+          oldAggRel.getInput()::fieldIsNullable);
+    }
+  }
+
+  private RexNode reduceFourthMoment(Aggregate oldAggRel, AggregateCall 
oldCall, List<AggregateCall> newCalls,
+      Map<AggregateCall, RexNode> aggCallMapping, boolean isKurtosis) {
+    final int nGroups = oldAggRel.getGroupCount();
+    final RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+    final AggregateCall fourthMomentCall =
+        AggregateCall.create(PinotFourthMomentAggregateFunction.INSTANCE,
+            oldCall.isDistinct(),
+            oldCall.isApproximate(),
+            oldCall.ignoreNulls(),
+            oldCall.getArgList(),
+            oldCall.filterArg,
+            oldCall.distinctKeys,
+            oldCall.collation,
+            oldAggRel.getGroupCount(),
+            oldAggRel.getInput(),
+            null,
+            null);
+
+    RexNode fmRef = rexBuilder.addAggCall(fourthMomentCall, nGroups, newCalls,
+        aggCallMapping, oldAggRel.getInput()::fieldIsNullable);
+
+    final RexNode skewRef = rexBuilder.makeCall(
+        isKurtosis ? PinotOperatorTable.KURTOSIS_REDUCE : 
PinotOperatorTable.SKEWNESS_REDUCE,
+        fmRef);
+    return rexBuilder.makeCast(oldCall.getType(), skewRef);
+  }
+
+  private boolean canReduce(AggregateCall call) {
+    return FUNCTIONS.contains(call.getAggregation().getName());
+  }
+
+  protected void newAggregateRel(RelBuilder relBuilder,
+      Aggregate oldAggregate,
+      List<AggregateCall> newCalls) {
+    relBuilder.aggregate(
+        relBuilder.groupKey(oldAggregate.getGroupSet(), 
oldAggregate.getGroupSets()),
+        newCalls);
+  }
+
+  protected void newCalcRel(RelBuilder relBuilder,
+      RelDataType rowType,
+      List<RexNode> exprs) {
+    relBuilder.project(exprs, rowType.getFieldNames());
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
index 3610ce0c4d..7963fd02e3 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
@@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality;
 
 public class PinotBoolAndAggregateFunction extends SqlAggFunction {
 
-  public PinotBoolAndAggregateFunction() {
+  public static final PinotBoolAndAggregateFunction INSTANCE = new 
PinotBoolAndAggregateFunction();
+
+  private PinotBoolAndAggregateFunction() {
     super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
         null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
         false, false, Optionality.FORBIDDEN);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
index 3d336c1070..547edef887 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java
@@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality;
 
 public class PinotBoolOrAggregateFunction extends SqlAggFunction {
 
-  public PinotBoolOrAggregateFunction() {
+  public static final PinotBoolOrAggregateFunction INSTANCE = new 
PinotBoolOrAggregateFunction();
+
+  private PinotBoolOrAggregateFunction() {
     super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
         null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
         false, false, Optionality.FORBIDDEN);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
similarity index 73%
copy from 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
index 3610ce0c4d..2d9ca9e3a0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java
@@ -24,14 +24,17 @@ import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Optionality;
 
 
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotFourthMomentAggregateFunction extends SqlAggFunction {
 
-  public PinotBoolAndAggregateFunction() {
-    super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
-        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  public static final PinotFourthMomentAggregateFunction INSTANCE = new 
PinotFourthMomentAggregateFunction();
+
+  public PinotFourthMomentAggregateFunction() {
+    super("FOURTHMOMENT", null, SqlKind.OTHER_FUNCTION, 
ReturnTypes.explicit(SqlTypeName.OTHER),
+        null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
         false, false, Optionality.FORBIDDEN);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
similarity index 74%
copy from 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
index 3610ce0c4d..f2d7639625 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java
@@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.util.Optionality;
 
 
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotKurtosisAggregateFunction extends SqlAggFunction {
 
-  public PinotBoolAndAggregateFunction() {
-    super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
-        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  public static final String KURTOSIS = "KURTOSIS";
+  public static final PinotKurtosisAggregateFunction INSTANCE = new 
PinotKurtosisAggregateFunction();
+
+  public PinotKurtosisAggregateFunction() {
+    super(KURTOSIS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE,
+        null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
         false, false, Optionality.FORBIDDEN);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
index 8f46a0db4e..17af5a7d6d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java
@@ -23,7 +23,11 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.validate.SqlNameMatchers;
 import org.apache.calcite.util.Util;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -46,8 +50,15 @@ public class PinotOperatorTable extends SqlStdOperatorTable {
   private static @MonotonicNonNull PinotOperatorTable _instance;
 
   public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction();
-  public static final SqlAggFunction BOOL_AND = new 
PinotBoolAndAggregateFunction();
-  public static final SqlAggFunction BOOL_OR = new 
PinotBoolOrAggregateFunction();
+  public static final SqlFunction SKEWNESS_REDUCE = new 
SqlFunction("SKEWNESS_REDUCE", SqlKind.OTHER_FUNCTION,
+      ReturnTypes.DOUBLE, null, OperandTypes.BINARY, 
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+  public static final SqlFunction KURTOSIS_REDUCE = new 
SqlFunction("KURTOSIS_REDUCE", SqlKind.OTHER_FUNCTION,
+      ReturnTypes.DOUBLE, null, OperandTypes.BINARY, 
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+
+  public static final SqlAggFunction BOOL_AND = 
PinotBoolAndAggregateFunction.INSTANCE;
+  public static final SqlAggFunction BOOL_OR = 
PinotBoolOrAggregateFunction.INSTANCE;
+  public static final SqlAggFunction SKEWNESS = 
PinotSkewnessAggregateFunction.INSTANCE;
+  public static final SqlAggFunction KURTOSIS = 
PinotKurtosisAggregateFunction.INSTANCE;
 
   // TODO: clean up lazy init by using 
Suppliers.memorized(this::computeInstance) and make getter wrapped around
   // supplier instance. this should replace all lazy init static objects in 
the codebase
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
similarity index 74%
copy from 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
copy to 
pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
index 3610ce0c4d..16857e8568 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java
@@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.util.Optionality;
 
 
-public class PinotBoolAndAggregateFunction extends SqlAggFunction {
+public class PinotSkewnessAggregateFunction extends SqlAggFunction {
 
-  public PinotBoolAndAggregateFunction() {
-    super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN,
-        null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  public static final String SKEWNESS = "SKEWNESS";
+  public static final PinotSkewnessAggregateFunction INSTANCE = new 
PinotSkewnessAggregateFunction();
+
+  public PinotSkewnessAggregateFunction() {
+    super(SKEWNESS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE,
+        null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION,
         false, false, Optionality.FORBIDDEN);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 72976185aa..80218c6442 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -165,6 +165,8 @@ public final class RelToStageConverter {
       case CHAR:
       case VARCHAR:
         return DataSchema.ColumnDataType.STRING;
+      case OTHER:
+        return DataSchema.ColumnDataType.OBJECT;
       case BINARY:
       case VARBINARY:
         return DataSchema.ColumnDataType.BYTES;
@@ -174,7 +176,11 @@ public final class RelToStageConverter {
   }
 
   public static FieldSpec.DataType convertToFieldSpecDataType(RelDataType 
relDataType) {
-    return convertToColumnDataType(relDataType).toDataType();
+    DataSchema.ColumnDataType columnDataType = 
convertToColumnDataType(relDataType);
+    if (columnDataType == DataSchema.ColumnDataType.OBJECT) {
+      return FieldSpec.DataType.BYTES;
+    }
+    return columnDataType.toDataType();
   }
 
   public static PinotDataType convertToPinotDataType(RelDataType relDataType) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 109764bf02..cad627b925 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.common.datablock.DataBlockBuilder;
 
 
@@ -87,7 +88,7 @@ public class TransferableBlock implements Block {
     if (_container == null) {
       switch (_type) {
         case ROW:
-          _container = DataBlockUtils.extractRows(_dataBlock);
+          _container = DataBlockUtils.extractRows(_dataBlock, 
ObjectSerDeUtils::deserialize);
           break;
         case COLUMNAR:
         case METADATA:
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index a82048949f..3182dba91f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -73,13 +74,14 @@ public class AggregateOperator extends MultiStageOperator {
   // groupSet has to be a list of InputRef and cannot be null
   // TODO: Add these two checks when we confirm we can handle error in 
upstream ctor call.
   public AggregateOperator(MultiStageOperator inputOperator, DataSchema 
dataSchema,
-      List<RexExpression> aggCalls, List<RexExpression> groupSet) {
-    this(inputOperator, dataSchema, aggCalls, groupSet, 
AggregateOperator.Accumulator.MERGERS);
+      List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
inputSchema) {
+    this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, 
AggregateOperator.Accumulator.MERGERS);
   }
 
   @VisibleForTesting
   AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
-      List<RexExpression> aggCalls, List<RexExpression> groupSet, Map<String, 
Merger> mergers) {
+      List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
inputSchema, Map<String,
+      Function<DataSchema.ColumnDataType, Merger>> mergers) {
     _inputOperator = inputOperator;
     _groupSet = groupSet;
     _upstreamErrorBlock = null;
@@ -96,7 +98,7 @@ public class AggregateOperator extends MultiStageOperator {
       if (!mergers.containsKey(functionName)) {
         throw new IllegalStateException("Unexpected value: " + functionName);
       }
-      _accumulators[i] = new Accumulator(agg, mergers.get(functionName));
+      _accumulators[i] = new Accumulator(agg, mergers, functionName, 
inputSchema);
     }
 
     _groupByKeyHolder = new HashMap<>();
@@ -184,6 +186,14 @@ public class AggregateOperator extends MultiStageOperator {
     return false;
   }
 
+  private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
+    Object[] keyElements = new Object[groupSet.size()];
+    for (int i = 0; i < groupSet.size(); i++) {
+      keyElements[i] = row[((RexExpression.InputRef) 
groupSet.get(i)).getIndex()];
+    }
+    return new Key(keyElements);
+  }
+
   private static Object mergeSum(Object left, Object right) {
     return ((Number) left).doubleValue() + ((Number) right).doubleValue();
   }
@@ -209,37 +219,81 @@ public class AggregateOperator extends MultiStageOperator 
{
     return ((Boolean) left) || ((Boolean) right);
   }
 
-  private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
-    Object[] keyElements = new Object[groupSet.size()];
-    for (int i = 0; i < groupSet.size(); i++) {
-      keyElements[i] = row[((RexExpression.InputRef) 
groupSet.get(i)).getIndex()];
+  // NOTE: the below two classes are needed depending on where the
+  // fourth moment is being executed - if the leaf stage gets a
+  // fourth moment pushed down to it, it will return a PinotFourthMoment
+  // as the result of the aggregation. If it is not possible (e.g. the
+  // input to the aggregate requires the result of a JOIN - such as
+  // FOURTHMOMENT(t1.a + t2.a)) then the input to the aggregate in the
+  // intermediate stage is a numeric.
+
+  private static class MergeFourthMomentNumeric implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      ((PinotFourthMoment) left).increment(((Number) right).doubleValue());
+      return left;
+    }
+
+    @Override
+    public Object initialize(Object other) {
+      PinotFourthMoment moment = new PinotFourthMoment();
+      moment.increment(((Number) other).doubleValue());
+      return moment;
     }
-    return new Key(keyElements);
   }
 
-  interface Merger extends BiFunction<Object, Object, Object> {
+  private static class MergeFourthMomentObject implements Merger {
+
+    @Override
+    public Object merge(Object left, Object right) {
+      PinotFourthMoment agg = (PinotFourthMoment) left;
+      agg.combine((PinotFourthMoment) right);
+      return agg;
+    }
+  }
+
+  interface Merger {
+    /**
+     * Initializes the merger based on the first input
+     */
+    default Object initialize(Object other) {
+      return other;
+    }
+
+    /**
+     * Merges the existing aggregate (the result of {@link 
#initialize(Object)}) with
+     * the new value coming in (which may be an aggregate in and of itself).
+     */
+    Object merge(Object agg, Object value);
   }
 
   private static class Accumulator {
 
-    private static final Map<String, Merger> MERGERS = ImmutableMap
-        .<String, Merger>builder()
-        .put("SUM", AggregateOperator::mergeSum)
-        .put("$SUM", AggregateOperator::mergeSum)
-        .put("$SUM0", AggregateOperator::mergeSum)
-        .put("MIN", AggregateOperator::mergeMin)
-        .put("$MIN", AggregateOperator::mergeMin)
-        .put("$MIN0", AggregateOperator::mergeMin)
-        .put("MAX", AggregateOperator::mergeMax)
-        .put("$MAX", AggregateOperator::mergeMax)
-        .put("$MAX0", AggregateOperator::mergeMax)
-        .put("COUNT", AggregateOperator::mergeCount)
-        .put("BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND", AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd)
-        .put("BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR", AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR0", AggregateOperator::mergeBoolOr)
+    private static final Map<String, Function<DataSchema.ColumnDataType, 
Merger>> MERGERS = ImmutableMap
+        .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
+        .put("SUM", cdt -> AggregateOperator::mergeSum)
+        .put("$SUM", cdt -> AggregateOperator::mergeSum)
+        .put("$SUM0", cdt -> AggregateOperator::mergeSum)
+        .put("MIN", cdt -> AggregateOperator::mergeMin)
+        .put("$MIN", cdt -> AggregateOperator::mergeMin)
+        .put("$MIN0", cdt -> AggregateOperator::mergeMin)
+        .put("MAX", cdt -> AggregateOperator::mergeMax)
+        .put("$MAX", cdt -> AggregateOperator::mergeMax)
+        .put("$MAX0", cdt -> AggregateOperator::mergeMax)
+        .put("COUNT", cdt -> AggregateOperator::mergeCount)
+        .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+        .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+        .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
+        .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+        .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+        .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr)
+        .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
+        .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
+        .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
+            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
         .build();
 
     final int _inputRef;
@@ -247,17 +301,21 @@ public class AggregateOperator extends MultiStageOperator 
{
     final Map<Key, Object> _results = new HashMap<>();
     final Merger _merger;
 
-    Accumulator(RexExpression.FunctionCall aggCall, Merger merger) {
-      _merger = merger;
+    Accumulator(RexExpression.FunctionCall aggCall, Map<String, 
Function<DataSchema.ColumnDataType, Merger>> merger,
+        String functionName, DataSchema inputSchema) {
       // agg function operand should either be a InputRef or a Literal
+      DataSchema.ColumnDataType dataType;
       RexExpression rexExpression = toAggregationFunctionOperand(aggCall);
       if (rexExpression instanceof RexExpression.InputRef) {
         _inputRef = ((RexExpression.InputRef) rexExpression).getIndex();
         _literal = null;
+        dataType = inputSchema.getColumnDataType(_inputRef);
       } else {
         _inputRef = -1;
         _literal = ((RexExpression.Literal) rexExpression).getValue();
+        dataType = 
DataSchema.ColumnDataType.fromDataType(rexExpression.getDataType(), false);
       }
+      _merger = merger.get(functionName).apply(dataType);
     }
 
     void accumulate(Key key, Object[] row) {
@@ -268,9 +326,9 @@ public class AggregateOperator extends MultiStageOperator {
       Object value = _inputRef == -1 ? _literal : row[_inputRef];
 
       if (currentRes == null) {
-        keys.put(key, value);
+        keys.put(key, _merger.initialize(value));
       } else {
-        Object mergedResult = _merger.apply(currentRes, value);
+        Object mergedResult = _merger.merge(currentRes, value);
         _results.put(key, mergedResult);
       }
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 365d04f863..e794a84194 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -282,7 +282,11 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
     for (int colId = 0; colId < row.length; colId++) {
       Object value = row[colId];
       if (value != null) {
-        resultRow[colId] = dataSchema.getColumnDataType(colId).convert(value);
+        if (dataSchema.getColumnDataType(colId) == 
DataSchema.ColumnDataType.OBJECT) {
+          resultRow[colId] = value;
+        } else {
+          resultRow[colId] = 
dataSchema.getColumnDataType(colId).convert(value);
+        }
       }
     }
     return resultRow;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 57a0949bc6..aaa140463f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -84,7 +84,7 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitAggregate(AggregateNode node, 
PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new AggregateOperator(nextOperator, node.getDataSchema(), 
node.getAggCalls(),
-        node.getGroupSet());
+        node.getGroupSet(), node.getInputs().get(0).getDataSchema());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index e29acb7206..25b7aa5e8b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -147,7 +148,7 @@ public class QueryDispatcher {
         for (int colId = 0; colId < numColumns; colId++) {
           nullBitmaps[colId] = dataBlock.getNullRowIds(colId);
         }
-        List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock);
+        List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock, 
ObjectSerDeUtils::deserialize);
         int rowId = 0;
         for (Object[] rawRow : rawRows) {
           Object[] row = new Object[numColumns];
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0996b7f4d0..aa7394a6a8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -69,8 +69,9 @@ public class AggregateOperatorTest {
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("foo!")));
 
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build
@@ -89,8 +90,9 @@ public class AggregateOperatorTest {
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -113,7 +115,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build when reading 
NoOp block
@@ -137,7 +139,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -163,7 +165,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -191,10 +193,11 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     AggregateOperator.Merger merger = 
Mockito.mock(AggregateOperator.Merger.class);
-    Mockito.when(merger.apply(Mockito.any(), Mockito.any())).thenReturn(12d);
+    Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d);
+    Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d);
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, ImmutableMap.of(
-        "SUM", merger
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, ImmutableMap.of(
+        "SUM", cdt -> merger
     ));
 
     // When:
@@ -203,7 +206,8 @@ public class AggregateOperatorTest {
     // Then:
     // should call merger twice, one from second row in first block and two 
from the first row
     // in second block
-    Mockito.verify(merger, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
+    Mockito.verify(merger, Mockito.times(1)).initialize(Mockito.any());
+    Mockito.verify(merger, Mockito.times(2)).merge(Mockito.any(), 
Mockito.any());
     Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{1, 
12d},
         "Expected two columns (group by key, agg value)");
   }
@@ -213,9 +217,10 @@ public class AggregateOperatorTest {
     MultiStageOperator upstreamOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     // Create an aggregation call with sum for first column and group by 
second column.
     RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0));
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     AggregateOperator sum0GroupBy1 =
         new AggregateOperator(upstreamOperator, 
OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
-            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)));
+            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), 
inSchema);
     TransferableBlock result = sum0GroupBy1.getNextBlock();
     while (result.isNoOpBlock()) {
       result = sum0GroupBy1.getNextBlock();
@@ -237,9 +242,10 @@ public class AggregateOperatorTest {
     );
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
     DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new 
ColumnDataType[]{DOUBLE});
+    DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new 
ColumnDataType[]{DOUBLE});
 
     // When:
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
   }
 
   @Test
@@ -256,7 +262,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
 
     // When:
     TransferableBlock block = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/resources/queries/Skew.json 
b/pinot-query-runtime/src/test/resources/queries/Skew.json
new file mode 100644
index 0000000000..7f16a6970e
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/Skew.json
@@ -0,0 +1,86 @@
+{
+  "skew": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "groupingCol", "type": "STRING"},
+          {"name": "partitionCol", "type": "STRING"},
+          {"name": "val", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", "key1", 1],
+          ["a", "key2", 2],
+          ["a", "key3", 3],
+          ["a", "key1", 4],
+          ["a", "key2", 4],
+          ["a", "key3", 4],
+          ["a", "key1", 7],
+          ["a", "key2", 9],
+          ["b", "key3", 1],
+          ["b", "key1", 2],
+          ["b", "key2", 3],
+          ["b", "key3", 4],
+          ["b", "key1", 4],
+          ["b", "key2", 4],
+          ["b", "key3", 7],
+          ["b", "key1", 9]
+        ],
+        "partitionColumns": [
+          "partitionCol"
+        ]
+      },
+      "tbl2": {
+        "schema": [
+          {"name": "groupingCol", "type": "STRING"},
+          {"name": "partitionCol", "type": "STRING"},
+          {"name": "val", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", "key1", 1],
+          ["a", "key2", 2],
+          ["a", "key3", 3],
+          ["a", "key1", 4],
+          ["a", "key2", 4],
+          ["a", "key3", 4],
+          ["a", "key1", 7],
+          ["a", "key2", 9],
+          ["b", "key3", 1],
+          ["b", "key1", 2],
+          ["b", "key2", 3],
+          ["b", "key3", 4],
+          ["b", "key1", 4],
+          ["b", "key2", 4],
+          ["b", "key3", 7],
+          ["b", "key1", 9]
+        ],
+        "partitionColumns": [
+          "partitionCol"
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "skew for int column",
+        "sql": "SELECT groupingCol, SKEWNESS(val), KURTOSIS(val) FROM {tbl} 
GROUP BY groupingCol",
+        "outputs": [
+          ["a", 0.8647536091225356, 0.3561662049861511],
+          ["b", 0.8647536091225356, 0.3561662049861511]
+        ]
+      },
+      {
+        "description": "no group by clause",
+        "sql": "SELECT SKEWNESS(val), KURTOSIS(val) FROM {tbl} WHERE 
groupingCol='a'",
+        "outputs": [
+          [0.8647536091225356, 0.3561662049861511]
+        ]
+      },
+      {
+        "sql": "SELECT t1.groupingCol, SKEWNESS(t1.val + t2.val), 
KURTOSIS(t1.val + t2.val) FROM {tbl} AS t1 LEFT JOIN {tbl2} AS t2 USING 
(partitionCol) GROUP BY t1.groupingCol",
+        "outputs": [
+          ["a", 0.5412443772804422, -0.001438580062540293],
+          ["b", 0.5412443772804422, -0.001438580062540293]
+        ]
+      }
+    ]
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 8f53122bd4..7c355d0057 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -67,6 +67,7 @@ public enum AggregationFunctionType {
   STDDEVSAMP("stdDevSamp"),
   SKEWNESS("skewness"),
   KURTOSIS("kurtosis"),
+  FOURTHMOMENT("fourthmoment"),
 
   // Geo aggregation functions
   STUNION("STUnion"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to