This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1231a2c985 [multistage] support aggregations that require intermediate representations (#10120) 1231a2c985 is described below commit 1231a2c9852d9c22b1ae7268aab69357347b24b6 Author: Almog Gavra <almog.ga...@gmail.com> AuthorDate: Wed Jan 18 17:11:21 2023 -0800 [multistage] support aggregations that require intermediate representations (#10120) * [multistage] support aggregations that require intermediate representations * move CustomObject to top level class * fix case when interemediate stage was first to aggregate * address rongs comments * address feedback * move InternalReduceFunctions to new package --- .../java/org/apache/pinot/common/CustomObject.java | 30 +-- .../pinot/common/datablock/BaseDataBlock.java | 15 ++ .../apache/pinot/common/datablock/DataBlock.java | 3 + .../pinot/common/datablock/DataBlockUtils.java | 13 +- .../pinot/common/datatable/BaseDataTable.java | 1 + .../apache/pinot/common/datatable/DataTable.java | 22 +-- .../pinot/common/datatable/DataTableImplV4.java | 1 + .../org/apache/pinot/common/utils/DataSchema.java | 2 + .../apache/pinot/core/common/ObjectSerDeUtils.java | 4 +- .../core/common/datablock/DataBlockBuilder.java | 4 +- .../common/datatable/BaseDataTableBuilder.java | 4 +- .../function/AggregationFunctionFactory.java | 2 + .../function/AggregationFunctionUtils.java | 3 +- .../function/FourthMomentAggregationFunction.java | 8 +- .../query/reduce/DistinctDataTableReducer.java | 3 +- .../core/query/reduce/GroupByDataTableReducer.java | 3 +- .../reduce/function/InternalReduceFunctions.java | 32 ++-- .../core/common/datatable/DataTableSerDeTest.java | 3 +- .../calcite/rel/rules/PinotQueryRuleSets.java | 1 + .../rules/PinotReduceAggregateFunctionsRule.java | 201 +++++++++++++++++++++ .../sql/fun/PinotBoolAndAggregateFunction.java | 4 +- .../sql/fun/PinotBoolOrAggregateFunction.java | 4 +- ...ava => PinotFourthMomentAggregateFunction.java} | 11 +- ...on.java => PinotKurtosisAggregateFunction.java} | 11 +- .../apache/calcite/sql/fun/PinotOperatorTable.java | 15 +- ...on.java => PinotSkewnessAggregateFunction.java} | 11 +- .../query/planner/logical/RelToStageConverter.java | 8 +- .../query/runtime/blocks/TransferableBlock.java | 3 +- .../query/runtime/operator/AggregateOperator.java | 124 +++++++++---- .../LeafStageTransferableBlockOperator.java | 6 +- .../query/runtime/plan/PhysicalPlanVisitor.java | 2 +- .../pinot/query/service/QueryDispatcher.java | 3 +- .../runtime/operator/AggregateOperatorTest.java | 30 +-- .../src/test/resources/queries/Skew.json | 86 +++++++++ .../pinot/segment/spi/AggregationFunctionType.java | 1 + 35 files changed, 548 insertions(+), 126 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java similarity index 56% copy from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java copy to pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java index 3d336c1070..355fff08db 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/CustomObject.java @@ -17,21 +17,27 @@ * under the License. */ -package org.apache.calcite.sql.fun; +package org.apache.pinot.common; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.util.Optionality; +import java.nio.ByteBuffer; -public class PinotBoolOrAggregateFunction extends SqlAggFunction { +public class CustomObject { + public static final int NULL_TYPE_VALUE = 100; - public PinotBoolOrAggregateFunction() { - super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, - null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, - false, false, Optionality.FORBIDDEN); + private final int _type; + private final ByteBuffer _buffer; + + public CustomObject(int type, ByteBuffer buffer) { + _type = type; + _buffer = buffer; + } + + public int getType() { + return _type; + } + + public ByteBuffer getBuffer() { + return _buffer; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java index ebbb6e7a97..27f0b5f732 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTableImplV3; import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.response.ProcessingException; @@ -345,6 +346,20 @@ public abstract class BaseDataBlock implements DataBlock { return strings; } + @Nullable + @Override + public CustomObject getCustomObject(int rowId, int colId) { + int size = positionOffsetInVariableBufferAndGetLength(rowId, colId); + int type = _variableSizeData.getInt(); + if (size == 0) { + assert type == CustomObject.NULL_TYPE_VALUE; + return null; + } + ByteBuffer buffer = _variableSizeData.slice(); + buffer.limit(size); + return new CustomObject(type, buffer); + } + @Nullable @Override public RoaringBitmap getNullRowIds(int colId) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java index ed8d40760a..418426b4ac 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.utils.ByteArray; @@ -74,6 +75,8 @@ public interface DataBlock { String[] getStringArray(int rowId, int colId); + CustomObject getCustomObject(int rowId, int colId); + @Nullable RoaringBitmap getNullRowIds(int colId); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java index 8b41969e8e..cd9a729c8b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; @@ -87,14 +89,14 @@ public final class DataBlockUtils { } } - public static List<Object[]> extractRows(DataBlock dataBlock) { + public static List<Object[]> extractRows(DataBlock dataBlock, Function<CustomObject, Object> customObjectSerde) { DataSchema dataSchema = dataBlock.getDataSchema(); DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); RoaringBitmap[] nullBitmaps = extractNullBitmaps(dataBlock); int numRows = dataBlock.getNumberOfRows(); List<Object[]> rows = new ArrayList<>(numRows); for (int rowId = 0; rowId < numRows; rowId++) { - rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes, nullBitmaps)); + rows.add(extractRowFromDataBlock(dataBlock, rowId, columnDataTypes, nullBitmaps, customObjectSerde)); } return rows; } @@ -189,8 +191,8 @@ public final class DataBlockUtils { return nullBitmaps; } - public static Object[] extractRowFromDataBlock(DataBlock dataBlock, int rowId, DataSchema.ColumnDataType[] dataTypes, - RoaringBitmap[] nullBitmaps) { + private static Object[] extractRowFromDataBlock(DataBlock dataBlock, int rowId, DataSchema.ColumnDataType[] dataTypes, + RoaringBitmap[] nullBitmaps, Function<CustomObject, Object> customObjectSerde) { int numColumns = nullBitmaps.length; Object[] row = new Object[numColumns]; for (int colId = 0; colId < numColumns; colId++) { @@ -250,6 +252,9 @@ public final class DataBlockUtils { case TIMESTAMP_ARRAY: row[colId] = DataSchema.ColumnDataType.TIMESTAMP_ARRAY.convert(dataBlock.getLongArray(rowId, colId)); break; + case OBJECT: + row[colId] = customObjectSerde.apply(dataBlock.getCustomObject(rowId, colId)); + break; default: throw new IllegalStateException( String.format("Unsupported data type: %s for column: %s", dataTypes[colId], colId)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java index d4e493589a..06ba4b34f1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.ByteArray; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index 5a5f323c93..9bac9706a7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -21,10 +21,10 @@ package org.apache.pinot.common.datatable; import com.google.common.base.Preconditions; import java.io.IOException; import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.utils.ByteArray; @@ -89,26 +89,6 @@ public interface DataTable { DataTable toDataOnlyDataTable(); - class CustomObject { - public static final int NULL_TYPE_VALUE = 100; - - private final int _type; - private final ByteBuffer _buffer; - - public CustomObject(int type, ByteBuffer buffer) { - _type = type; - _buffer = buffer; - } - - public int getType() { - return _type; - } - - public ByteBuffer getBuffer() { - return _buffer; - } - } - enum MetadataValueType { INT, LONG, STRING } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index 78fa5606b6..d4d27634f9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 9c7b985bfd..4854f5bf22 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -422,6 +422,8 @@ public class DataSchema { return toTimestampArray(value); case BYTES_ARRAY: return (byte[][]) value; + case OBJECT: + return (Serializable) value; default: throw new IllegalStateException(String.format("Cannot convert: '%s' to type: %s", value, this)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index fbfee474e9..a01f02a5c9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -59,7 +59,7 @@ import java.util.Map; import java.util.Set; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; -import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.core.query.distinct.DistinctTable; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; @@ -1244,7 +1244,7 @@ public class ObjectSerDeUtils { return SER_DES[objectTypeValue].serialize(value); } - public static <T> T deserialize(DataTable.CustomObject customObject) { + public static <T> T deserialize(CustomObject customObject) { return (T) SER_DES[customObject.getType()].deserialize(customObject.getBuffer()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java index 19ed9cd05a..573b3beadf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java @@ -28,11 +28,11 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.List; import javax.annotation.Nullable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datablock.ColumnarDataBlock; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.RowDataBlock; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.core.common.ObjectSerDeUtils; @@ -524,7 +524,7 @@ public class DataBlockBuilder { byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); if (value == null) { byteBuffer.putInt(0); - builder._variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE); + builder._variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE); } else { int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue(); byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java index 9547924ac5..c0a8ff8ea1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import javax.annotation.Nullable; -import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.ObjectSerDeUtils; @@ -103,7 +103,7 @@ public abstract class BaseDataTableBuilder implements DataTableBuilder { _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); if (value == null) { _currentRowDataByteBuffer.putInt(0); - _variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE); + _variableSizeDataOutputStream.writeInt(CustomObject.NULL_TYPE_VALUE); } else { int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue(); byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 9b571ff3c4..e7d1cba0e4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -297,6 +297,8 @@ public class AggregationFunctionFactory { return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.SKEWNESS); case KURTOSIS: return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS); + case FOURTHMOMENT: + return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT); default: throw new IllegalArgumentException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java index 6b1dd21e3c..89e9d3d8e5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; @@ -142,7 +143,7 @@ public class AggregationFunctionUtils { case DOUBLE: return dataTable.getDouble(rowId, colId); case OBJECT: - DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId); + CustomObject customObject = dataTable.getCustomObject(rowId, colId); return customObject != null ? ObjectSerDeUtils.deserialize(customObject) : null; default: throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java index 9cb06e4eeb..a6f4b707ff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java @@ -36,7 +36,7 @@ public class FourthMomentAggregationFunction extends BaseSingleInputAggregationF private final Type _type; enum Type { - KURTOSIS, SKEWNESS + KURTOSIS, SKEWNESS, MOMENT } public FourthMomentAggregationFunction(ExpressionContext expression, Type type) { @@ -51,6 +51,8 @@ public class FourthMomentAggregationFunction extends BaseSingleInputAggregationF return AggregationFunctionType.KURTOSIS; case SKEWNESS: return AggregationFunctionType.SKEWNESS; + case MOMENT: + return AggregationFunctionType.FOURTHMOMENT; default: throw new IllegalArgumentException("Unexpected type " + _type); } @@ -159,6 +161,10 @@ public class FourthMomentAggregationFunction extends BaseSingleInputAggregationF return m4.kurtosis(); case SKEWNESS: return m4.skew(); + case MOMENT: + // this should never happen, as we're not extracting + // final result when using this method + throw new UnsupportedOperationException("Fourth moment cannot be used as aggregation function directly"); default: throw new IllegalStateException("Unexpected value: " + _type); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java index af25afe24e..de65c18657 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.broker.BrokerResponseNative; @@ -77,7 +78,7 @@ public class DistinctDataTableReducer implements DataTableReducer { int numColumns = dataSchema.size(); if (numColumns == 1 && dataSchema.getColumnDataType(0) == ColumnDataType.OBJECT) { // DistinctTable is still being returned as a single object - DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0); + CustomObject customObject = dataTable.getCustomObject(0, 0); assert customObject != null; DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject); if (!distinctTable.isEmpty()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 960cf1cb07..cb5975d02b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerGauge; @@ -318,7 +319,7 @@ public class GroupByDataTableReducer implements DataTableReducer { break; case OBJECT: // TODO: Move ser/de into AggregationFunction interface - DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId); + CustomObject customObject = dataTable.getCustomObject(rowId, colId); if (customObject != null) { values[colId] = ObjectSerDeUtils.deserialize(customObject); } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java similarity index 54% copy from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java copy to pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java index 3610ce0c4d..e56e82298e 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/function/InternalReduceFunctions.java @@ -17,21 +17,29 @@ * under the License. */ -package org.apache.calcite.sql.fun; +package org.apache.pinot.core.query.reduce.function; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.util.Optionality; +import org.apache.pinot.segment.local.customobject.PinotFourthMoment; +import org.apache.pinot.spi.annotations.ScalarFunction; -public class PinotBoolAndAggregateFunction extends SqlAggFunction { +/** + * This class contains functions that are necessary for the multistage engine + * aggregations that need to be reduced after the initial aggregation to get + * the final result. + */ +public class InternalReduceFunctions { + + private InternalReduceFunctions() { + } + + @ScalarFunction + public static double skewnessReduce(PinotFourthMoment fourthMoment) { + return fourthMoment.skew(); + } - public PinotBoolAndAggregateFunction() { - super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, - null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, - false, false, Optionality.FORBIDDEN); + @ScalarFunction + public static double kurtosisReduce(PinotFourthMoment fourthMoment) { + return fourthMoment.kurtosis(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java index 22b0e92630..9b53e87354 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Random; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.CustomObject; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.datatable.DataTableFactory; @@ -734,7 +735,7 @@ public class DataTableSerDeTest { ERROR_MESSAGE); break; case OBJECT: - DataTable.CustomObject customObject = newDataTable.getCustomObject(rowId, colId); + CustomObject customObject = newDataTable.getCustomObject(rowId, colId); if (isNull) { Assert.assertNull(customObject, ERROR_MESSAGE); } else { diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java index 1828abd0e8..43ec25dfba 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java @@ -82,6 +82,7 @@ public class PinotQueryRuleSets { CoreRules.AGGREGATE_UNION_AGGREGATE, // reduce aggregate functions like AVG, STDDEV_POP etc. + PinotReduceAggregateFunctionsRule.INSTANCE, CoreRules.AGGREGATE_REDUCE_FUNCTIONS, // remove unnecessary sort rule diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java new file mode 100644 index 0000000000..5c6fd18e86 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotReduceAggregateFunctionsRule.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.calcite.rel.rules; + +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.PinotFourthMomentAggregateFunction; +import org.apache.calcite.sql.fun.PinotKurtosisAggregateFunction; +import org.apache.calcite.sql.fun.PinotOperatorTable; +import org.apache.calcite.sql.fun.PinotSkewnessAggregateFunction; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.CompositeList; + + +/** + * This rule rewrites aggregate functions when necessary for Pinot's + * multistage engine. For example, SKEWNESS must be rewritten into two + * parts: a multi-stage FOURTH_MOMENT calculation and then a scalar function + * that reduces the moment into the skewness at the end. This is to ensure + * that the aggregation computation can merge partial results from different + * intermediate nodes before reducing it into the final result. + * + * <p>This implementation follows closely with Calcite's + * {@link AggregateReduceFunctionsRule}. + */ +public class PinotReduceAggregateFunctionsRule extends RelOptRule { + + public static final PinotReduceAggregateFunctionsRule INSTANCE = + new PinotReduceAggregateFunctionsRule(PinotRuleUtils.PINOT_REL_FACTORY); + + private static final Set<String> FUNCTIONS = ImmutableSet.of( + PinotSkewnessAggregateFunction.SKEWNESS, + PinotKurtosisAggregateFunction.KURTOSIS + ); + + protected PinotReduceAggregateFunctionsRule(RelBuilderFactory factory) { + super(operand(Aggregate.class, any()), factory, null); + } + + @Override + public boolean matches(RelOptRuleCall call) { + if (call.rels.length < 1) { + return false; + } + + if (call.rel(0) instanceof Aggregate) { + Aggregate agg = call.rel(0); + for (AggregateCall aggCall : agg.getAggCallList()) { + if (canReduce(aggCall)) { + return true; + } + } + } + + return false; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate oldAggRel = call.rel(0); + reduceAggs(call, oldAggRel); + } + + private void reduceAggs(RelOptRuleCall ruleCall, Aggregate oldAggRel) { + RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder(); + + List<AggregateCall> oldCalls = oldAggRel.getAggCallList(); + final int groupCount = oldAggRel.getGroupCount(); + + final List<AggregateCall> newCalls = new ArrayList<>(); + final Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>(); + + final List<RexNode> projList = new ArrayList<>(); + + // pass through group key + for (int i = 0; i < groupCount; i++) { + projList.add(rexBuilder.makeInputRef(oldAggRel, i)); + } + + // List of input expressions. If a particular aggregate needs more, it + // will add an expression to the end, and we will create an extra project + final RelBuilder relBuilder = ruleCall.builder(); + relBuilder.push(oldAggRel.getInput()); + final List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields()); + + // create new aggregate function calls and rest of project list together + for (AggregateCall oldCall : oldCalls) { + projList.add( + reduceAgg(oldAggRel, oldCall, newCalls, aggCallMapping, inputExprs)); + } + + final int extraArgCount = inputExprs.size() - relBuilder.peek().getRowType().getFieldCount(); + if (extraArgCount > 0) { + relBuilder.project(inputExprs, + CompositeList.of( + relBuilder.peek().getRowType().getFieldNames(), + Collections.nCopies(extraArgCount, null))); + } + newAggregateRel(relBuilder, oldAggRel, newCalls); + newCalcRel(relBuilder, oldAggRel.getRowType(), projList); + ruleCall.transformTo(relBuilder.build()); + } + + private RexNode reduceAgg(Aggregate oldAggRel, AggregateCall oldCall, List<AggregateCall> newCalls, + Map<AggregateCall, RexNode> aggCallMapping, List<RexNode> inputExprs) { + if (canReduce(oldCall)) { + switch (oldCall.getAggregation().getName()) { + case PinotSkewnessAggregateFunction.SKEWNESS: + return reduceFourthMoment(oldAggRel, oldCall, newCalls, aggCallMapping, false); + case PinotKurtosisAggregateFunction.KURTOSIS: + return reduceFourthMoment(oldAggRel, oldCall, newCalls, aggCallMapping, true); + default: + throw new IllegalStateException("Unexpected aggregation name " + oldCall.getAggregation().getName()); + } + } else { + // anything else: preserve original call + RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder(); + final int nGroups = oldAggRel.getGroupCount(); + return rexBuilder.addAggCall(oldCall, + nGroups, + newCalls, + aggCallMapping, + oldAggRel.getInput()::fieldIsNullable); + } + } + + private RexNode reduceFourthMoment(Aggregate oldAggRel, AggregateCall oldCall, List<AggregateCall> newCalls, + Map<AggregateCall, RexNode> aggCallMapping, boolean isKurtosis) { + final int nGroups = oldAggRel.getGroupCount(); + final RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder(); + final AggregateCall fourthMomentCall = + AggregateCall.create(PinotFourthMomentAggregateFunction.INSTANCE, + oldCall.isDistinct(), + oldCall.isApproximate(), + oldCall.ignoreNulls(), + oldCall.getArgList(), + oldCall.filterArg, + oldCall.distinctKeys, + oldCall.collation, + oldAggRel.getGroupCount(), + oldAggRel.getInput(), + null, + null); + + RexNode fmRef = rexBuilder.addAggCall(fourthMomentCall, nGroups, newCalls, + aggCallMapping, oldAggRel.getInput()::fieldIsNullable); + + final RexNode skewRef = rexBuilder.makeCall( + isKurtosis ? PinotOperatorTable.KURTOSIS_REDUCE : PinotOperatorTable.SKEWNESS_REDUCE, + fmRef); + return rexBuilder.makeCast(oldCall.getType(), skewRef); + } + + private boolean canReduce(AggregateCall call) { + return FUNCTIONS.contains(call.getAggregation().getName()); + } + + protected void newAggregateRel(RelBuilder relBuilder, + Aggregate oldAggregate, + List<AggregateCall> newCalls) { + relBuilder.aggregate( + relBuilder.groupKey(oldAggregate.getGroupSet(), oldAggregate.getGroupSets()), + newCalls); + } + + protected void newCalcRel(RelBuilder relBuilder, + RelDataType rowType, + List<RexNode> exprs) { + relBuilder.project(exprs, rowType.getFieldNames()); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java index 3610ce0c4d..7963fd02e3 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java @@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality; public class PinotBoolAndAggregateFunction extends SqlAggFunction { - public PinotBoolAndAggregateFunction() { + public static final PinotBoolAndAggregateFunction INSTANCE = new PinotBoolAndAggregateFunction(); + + private PinotBoolAndAggregateFunction() { super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, false, false, Optionality.FORBIDDEN); diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java index 3d336c1070..547edef887 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolOrAggregateFunction.java @@ -29,7 +29,9 @@ import org.apache.calcite.util.Optionality; public class PinotBoolOrAggregateFunction extends SqlAggFunction { - public PinotBoolOrAggregateFunction() { + public static final PinotBoolOrAggregateFunction INSTANCE = new PinotBoolOrAggregateFunction(); + + private PinotBoolOrAggregateFunction() { super("BOOL_OR", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, false, false, Optionality.FORBIDDEN); diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java similarity index 73% copy from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java index 3610ce0c4d..2d9ca9e3a0 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotFourthMomentAggregateFunction.java @@ -24,14 +24,17 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Optionality; -public class PinotBoolAndAggregateFunction extends SqlAggFunction { +public class PinotFourthMomentAggregateFunction extends SqlAggFunction { - public PinotBoolAndAggregateFunction() { - super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, - null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, + public static final PinotFourthMomentAggregateFunction INSTANCE = new PinotFourthMomentAggregateFunction(); + + public PinotFourthMomentAggregateFunction() { + super("FOURTHMOMENT", null, SqlKind.OTHER_FUNCTION, ReturnTypes.explicit(SqlTypeName.OTHER), + null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION, false, false, Optionality.FORBIDDEN); } } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java similarity index 74% copy from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java index 3610ce0c4d..f2d7639625 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotKurtosisAggregateFunction.java @@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.util.Optionality; -public class PinotBoolAndAggregateFunction extends SqlAggFunction { +public class PinotKurtosisAggregateFunction extends SqlAggFunction { - public PinotBoolAndAggregateFunction() { - super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, - null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, + public static final String KURTOSIS = "KURTOSIS"; + public static final PinotKurtosisAggregateFunction INSTANCE = new PinotKurtosisAggregateFunction(); + + public PinotKurtosisAggregateFunction() { + super(KURTOSIS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE, + null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION, false, false, Optionality.FORBIDDEN); } } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java index 8f46a0db4e..17af5a7d6d 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotOperatorTable.java @@ -23,7 +23,11 @@ import java.util.ArrayList; import java.util.List; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.validate.SqlNameMatchers; import org.apache.calcite.util.Util; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -46,8 +50,15 @@ public class PinotOperatorTable extends SqlStdOperatorTable { private static @MonotonicNonNull PinotOperatorTable _instance; public static final SqlFunction COALESCE = new PinotSqlCoalesceFunction(); - public static final SqlAggFunction BOOL_AND = new PinotBoolAndAggregateFunction(); - public static final SqlAggFunction BOOL_OR = new PinotBoolOrAggregateFunction(); + public static final SqlFunction SKEWNESS_REDUCE = new SqlFunction("SKEWNESS_REDUCE", SqlKind.OTHER_FUNCTION, + ReturnTypes.DOUBLE, null, OperandTypes.BINARY, SqlFunctionCategory.USER_DEFINED_FUNCTION); + public static final SqlFunction KURTOSIS_REDUCE = new SqlFunction("KURTOSIS_REDUCE", SqlKind.OTHER_FUNCTION, + ReturnTypes.DOUBLE, null, OperandTypes.BINARY, SqlFunctionCategory.USER_DEFINED_FUNCTION); + + public static final SqlAggFunction BOOL_AND = PinotBoolAndAggregateFunction.INSTANCE; + public static final SqlAggFunction BOOL_OR = PinotBoolOrAggregateFunction.INSTANCE; + public static final SqlAggFunction SKEWNESS = PinotSkewnessAggregateFunction.INSTANCE; + public static final SqlAggFunction KURTOSIS = PinotKurtosisAggregateFunction.INSTANCE; // TODO: clean up lazy init by using Suppliers.memorized(this::computeInstance) and make getter wrapped around // supplier instance. this should replace all lazy init static objects in the codebase diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java similarity index 74% copy from pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java copy to pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java index 3610ce0c4d..16857e8568 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotBoolAndAggregateFunction.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/sql/fun/PinotSkewnessAggregateFunction.java @@ -27,11 +27,14 @@ import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.util.Optionality; -public class PinotBoolAndAggregateFunction extends SqlAggFunction { +public class PinotSkewnessAggregateFunction extends SqlAggFunction { - public PinotBoolAndAggregateFunction() { - super("BOOL_AND", null, SqlKind.OTHER_FUNCTION, ReturnTypes.BOOLEAN, - null, OperandTypes.BOOLEAN, SqlFunctionCategory.USER_DEFINED_FUNCTION, + public static final String SKEWNESS = "SKEWNESS"; + public static final PinotSkewnessAggregateFunction INSTANCE = new PinotSkewnessAggregateFunction(); + + public PinotSkewnessAggregateFunction() { + super(SKEWNESS, null, SqlKind.OTHER_FUNCTION, ReturnTypes.DOUBLE, + null, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION, false, false, Optionality.FORBIDDEN); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java index 72976185aa..80218c6442 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java @@ -165,6 +165,8 @@ public final class RelToStageConverter { case CHAR: case VARCHAR: return DataSchema.ColumnDataType.STRING; + case OTHER: + return DataSchema.ColumnDataType.OBJECT; case BINARY: case VARBINARY: return DataSchema.ColumnDataType.BYTES; @@ -174,7 +176,11 @@ public final class RelToStageConverter { } public static FieldSpec.DataType convertToFieldSpecDataType(RelDataType relDataType) { - return convertToColumnDataType(relDataType).toDataType(); + DataSchema.ColumnDataType columnDataType = convertToColumnDataType(relDataType); + if (columnDataType == DataSchema.ColumnDataType.OBJECT) { + return FieldSpec.DataType.BYTES; + } + return columnDataType.toDataType(); } public static PinotDataType convertToPinotDataType(RelDataType relDataType) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java index 109764bf02..cad627b925 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.common.BlockDocIdValueSet; import org.apache.pinot.core.common.BlockMetadata; import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.common.datablock.DataBlockBuilder; @@ -87,7 +88,7 @@ public class TransferableBlock implements Block { if (_container == null) { switch (_type) { case ROW: - _container = DataBlockUtils.extractRows(_dataBlock); + _container = DataBlockUtils.extractRows(_dataBlock, ObjectSerDeUtils::deserialize); break; case COLUMNAR: case METADATA: diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index a82048949f..3182dba91f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -26,7 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; @@ -35,6 +35,7 @@ import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.segment.local.customobject.PinotFourthMoment; import org.apache.pinot.spi.data.FieldSpec; @@ -73,13 +74,14 @@ public class AggregateOperator extends MultiStageOperator { // groupSet has to be a list of InputRef and cannot be null // TODO: Add these two checks when we confirm we can handle error in upstream ctor call. public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, - List<RexExpression> aggCalls, List<RexExpression> groupSet) { - this(inputOperator, dataSchema, aggCalls, groupSet, AggregateOperator.Accumulator.MERGERS); + List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema inputSchema) { + this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.Accumulator.MERGERS); } @VisibleForTesting AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, - List<RexExpression> aggCalls, List<RexExpression> groupSet, Map<String, Merger> mergers) { + List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema inputSchema, Map<String, + Function<DataSchema.ColumnDataType, Merger>> mergers) { _inputOperator = inputOperator; _groupSet = groupSet; _upstreamErrorBlock = null; @@ -96,7 +98,7 @@ public class AggregateOperator extends MultiStageOperator { if (!mergers.containsKey(functionName)) { throw new IllegalStateException("Unexpected value: " + functionName); } - _accumulators[i] = new Accumulator(agg, mergers.get(functionName)); + _accumulators[i] = new Accumulator(agg, mergers, functionName, inputSchema); } _groupByKeyHolder = new HashMap<>(); @@ -184,6 +186,14 @@ public class AggregateOperator extends MultiStageOperator { return false; } + private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) { + Object[] keyElements = new Object[groupSet.size()]; + for (int i = 0; i < groupSet.size(); i++) { + keyElements[i] = row[((RexExpression.InputRef) groupSet.get(i)).getIndex()]; + } + return new Key(keyElements); + } + private static Object mergeSum(Object left, Object right) { return ((Number) left).doubleValue() + ((Number) right).doubleValue(); } @@ -209,37 +219,81 @@ public class AggregateOperator extends MultiStageOperator { return ((Boolean) left) || ((Boolean) right); } - private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) { - Object[] keyElements = new Object[groupSet.size()]; - for (int i = 0; i < groupSet.size(); i++) { - keyElements[i] = row[((RexExpression.InputRef) groupSet.get(i)).getIndex()]; + // NOTE: the below two classes are needed depending on where the + // fourth moment is being executed - if the leaf stage gets a + // fourth moment pushed down to it, it will return a PinotFourthMoment + // as the result of the aggregation. If it is not possible (e.g. the + // input to the aggregate requires the result of a JOIN - such as + // FOURTHMOMENT(t1.a + t2.a)) then the input to the aggregate in the + // intermediate stage is a numeric. + + private static class MergeFourthMomentNumeric implements Merger { + + @Override + public Object merge(Object left, Object right) { + ((PinotFourthMoment) left).increment(((Number) right).doubleValue()); + return left; + } + + @Override + public Object initialize(Object other) { + PinotFourthMoment moment = new PinotFourthMoment(); + moment.increment(((Number) other).doubleValue()); + return moment; } - return new Key(keyElements); } - interface Merger extends BiFunction<Object, Object, Object> { + private static class MergeFourthMomentObject implements Merger { + + @Override + public Object merge(Object left, Object right) { + PinotFourthMoment agg = (PinotFourthMoment) left; + agg.combine((PinotFourthMoment) right); + return agg; + } + } + + interface Merger { + /** + * Initializes the merger based on the first input + */ + default Object initialize(Object other) { + return other; + } + + /** + * Merges the existing aggregate (the result of {@link #initialize(Object)}) with + * the new value coming in (which may be an aggregate in and of itself). + */ + Object merge(Object agg, Object value); } private static class Accumulator { - private static final Map<String, Merger> MERGERS = ImmutableMap - .<String, Merger>builder() - .put("SUM", AggregateOperator::mergeSum) - .put("$SUM", AggregateOperator::mergeSum) - .put("$SUM0", AggregateOperator::mergeSum) - .put("MIN", AggregateOperator::mergeMin) - .put("$MIN", AggregateOperator::mergeMin) - .put("$MIN0", AggregateOperator::mergeMin) - .put("MAX", AggregateOperator::mergeMax) - .put("$MAX", AggregateOperator::mergeMax) - .put("$MAX0", AggregateOperator::mergeMax) - .put("COUNT", AggregateOperator::mergeCount) - .put("BOOL_AND", AggregateOperator::mergeBoolAnd) - .put("$BOOL_AND", AggregateOperator::mergeBoolAnd) - .put("$BOOL_AND0", AggregateOperator::mergeBoolAnd) - .put("BOOL_OR", AggregateOperator::mergeBoolOr) - .put("$BOOL_OR", AggregateOperator::mergeBoolOr) - .put("$BOOL_OR0", AggregateOperator::mergeBoolOr) + private static final Map<String, Function<DataSchema.ColumnDataType, Merger>> MERGERS = ImmutableMap + .<String, Function<DataSchema.ColumnDataType, Merger>>builder() + .put("SUM", cdt -> AggregateOperator::mergeSum) + .put("$SUM", cdt -> AggregateOperator::mergeSum) + .put("$SUM0", cdt -> AggregateOperator::mergeSum) + .put("MIN", cdt -> AggregateOperator::mergeMin) + .put("$MIN", cdt -> AggregateOperator::mergeMin) + .put("$MIN0", cdt -> AggregateOperator::mergeMin) + .put("MAX", cdt -> AggregateOperator::mergeMax) + .put("$MAX", cdt -> AggregateOperator::mergeMax) + .put("$MAX0", cdt -> AggregateOperator::mergeMax) + .put("COUNT", cdt -> AggregateOperator::mergeCount) + .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) + .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd) + .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd) + .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) + .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr) + .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr) + .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT + ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) + .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT + ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) + .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT + ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric()) .build(); final int _inputRef; @@ -247,17 +301,21 @@ public class AggregateOperator extends MultiStageOperator { final Map<Key, Object> _results = new HashMap<>(); final Merger _merger; - Accumulator(RexExpression.FunctionCall aggCall, Merger merger) { - _merger = merger; + Accumulator(RexExpression.FunctionCall aggCall, Map<String, Function<DataSchema.ColumnDataType, Merger>> merger, + String functionName, DataSchema inputSchema) { // agg function operand should either be a InputRef or a Literal + DataSchema.ColumnDataType dataType; RexExpression rexExpression = toAggregationFunctionOperand(aggCall); if (rexExpression instanceof RexExpression.InputRef) { _inputRef = ((RexExpression.InputRef) rexExpression).getIndex(); _literal = null; + dataType = inputSchema.getColumnDataType(_inputRef); } else { _inputRef = -1; _literal = ((RexExpression.Literal) rexExpression).getValue(); + dataType = DataSchema.ColumnDataType.fromDataType(rexExpression.getDataType(), false); } + _merger = merger.get(functionName).apply(dataType); } void accumulate(Key key, Object[] row) { @@ -268,9 +326,9 @@ public class AggregateOperator extends MultiStageOperator { Object value = _inputRef == -1 ? _literal : row[_inputRef]; if (currentRes == null) { - keys.put(key, value); + keys.put(key, _merger.initialize(value)); } else { - Object mergedResult = _merger.apply(currentRes, value); + Object mergedResult = _merger.merge(currentRes, value); _results.put(key, mergedResult); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 365d04f863..e794a84194 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -282,7 +282,11 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { for (int colId = 0; colId < row.length; colId++) { Object value = row[colId]; if (value != null) { - resultRow[colId] = dataSchema.getColumnDataType(colId).convert(value); + if (dataSchema.getColumnDataType(colId) == DataSchema.ColumnDataType.OBJECT) { + resultRow[colId] = value; + } else { + resultRow[colId] = dataSchema.getColumnDataType(colId).convert(value); + } } } return resultRow; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 57a0949bc6..aaa140463f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -84,7 +84,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), - node.getGroupSet()); + node.getGroupSet(), node.getInputs().get(0).getDataSchema()); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index e29acb7206..25b7aa5e8b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.QueryPlan; @@ -147,7 +148,7 @@ public class QueryDispatcher { for (int colId = 0; colId < numColumns; colId++) { nullBitmaps[colId] = dataBlock.getNullRowIds(colId); } - List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock); + List<Object[]> rawRows = DataBlockUtils.extractRows(dataBlock, ObjectSerDeUtils::deserialize); int rowId = 0; for (Object[] rawRow : rawRows) { Object[] row = new Object[numColumns]; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 0996b7f4d0..aa7394a6a8 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -69,8 +69,9 @@ public class AggregateOperatorTest { Mockito.when(_input.nextBlock()) .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"))); + DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build @@ -89,8 +90,9 @@ public class AggregateOperatorTest { Mockito.when(_input.nextBlock()) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block = operator.nextBlock(); @@ -113,7 +115,7 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block @@ -137,7 +139,7 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -163,7 +165,7 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -191,10 +193,11 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); AggregateOperator.Merger merger = Mockito.mock(AggregateOperator.Merger.class); - Mockito.when(merger.apply(Mockito.any(), Mockito.any())).thenReturn(12d); + Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d); + Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, ImmutableMap.of( - "SUM", merger + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of( + "SUM", cdt -> merger )); // When: @@ -203,7 +206,8 @@ public class AggregateOperatorTest { // Then: // should call merger twice, one from second row in first block and two from the first row // in second block - Mockito.verify(merger, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); + Mockito.verify(merger, Mockito.times(1)).initialize(Mockito.any()); + Mockito.verify(merger, Mockito.times(2)).merge(Mockito.any(), Mockito.any()); Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{1, 12d}, "Expected two columns (group by key, agg value)"); } @@ -213,9 +217,10 @@ public class AggregateOperatorTest { MultiStageOperator upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); // Create an aggregation call with sum for first column and group by second column. RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0)); + DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); AggregateOperator sum0GroupBy1 = new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1), - Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1))); + Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema); TransferableBlock result = sum0GroupBy1.getNextBlock(); while (result.isNoOpBlock()) { result = sum0GroupBy1.getNextBlock(); @@ -237,9 +242,10 @@ public class AggregateOperatorTest { ); List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE}); + DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE}); // When: - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); } @Test @@ -256,7 +262,7 @@ public class AggregateOperatorTest { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group); + AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema); // When: TransferableBlock block = operator.nextBlock(); diff --git a/pinot-query-runtime/src/test/resources/queries/Skew.json b/pinot-query-runtime/src/test/resources/queries/Skew.json new file mode 100644 index 0000000000..7f16a6970e --- /dev/null +++ b/pinot-query-runtime/src/test/resources/queries/Skew.json @@ -0,0 +1,86 @@ +{ + "skew": { + "tables": { + "tbl": { + "schema": [ + {"name": "groupingCol", "type": "STRING"}, + {"name": "partitionCol", "type": "STRING"}, + {"name": "val", "type": "INT"} + ], + "inputs": [ + ["a", "key1", 1], + ["a", "key2", 2], + ["a", "key3", 3], + ["a", "key1", 4], + ["a", "key2", 4], + ["a", "key3", 4], + ["a", "key1", 7], + ["a", "key2", 9], + ["b", "key3", 1], + ["b", "key1", 2], + ["b", "key2", 3], + ["b", "key3", 4], + ["b", "key1", 4], + ["b", "key2", 4], + ["b", "key3", 7], + ["b", "key1", 9] + ], + "partitionColumns": [ + "partitionCol" + ] + }, + "tbl2": { + "schema": [ + {"name": "groupingCol", "type": "STRING"}, + {"name": "partitionCol", "type": "STRING"}, + {"name": "val", "type": "INT"} + ], + "inputs": [ + ["a", "key1", 1], + ["a", "key2", 2], + ["a", "key3", 3], + ["a", "key1", 4], + ["a", "key2", 4], + ["a", "key3", 4], + ["a", "key1", 7], + ["a", "key2", 9], + ["b", "key3", 1], + ["b", "key1", 2], + ["b", "key2", 3], + ["b", "key3", 4], + ["b", "key1", 4], + ["b", "key2", 4], + ["b", "key3", 7], + ["b", "key1", 9] + ], + "partitionColumns": [ + "partitionCol" + ] + } + }, + "queries": [ + { + "description": "skew for int column", + "sql": "SELECT groupingCol, SKEWNESS(val), KURTOSIS(val) FROM {tbl} GROUP BY groupingCol", + "outputs": [ + ["a", 0.8647536091225356, 0.3561662049861511], + ["b", 0.8647536091225356, 0.3561662049861511] + ] + }, + { + "description": "no group by clause", + "sql": "SELECT SKEWNESS(val), KURTOSIS(val) FROM {tbl} WHERE groupingCol='a'", + "outputs": [ + [0.8647536091225356, 0.3561662049861511] + ] + }, + { + "sql": "SELECT t1.groupingCol, SKEWNESS(t1.val + t2.val), KURTOSIS(t1.val + t2.val) FROM {tbl} AS t1 LEFT JOIN {tbl2} AS t2 USING (partitionCol) GROUP BY t1.groupingCol", + "outputs": [ + ["a", 0.5412443772804422, -0.001438580062540293], + ["b", 0.5412443772804422, -0.001438580062540293] + ] + } + ] + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 8f53122bd4..7c355d0057 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -67,6 +67,7 @@ public enum AggregationFunctionType { STDDEVSAMP("stdDevSamp"), SKEWNESS("skewness"), KURTOSIS("kurtosis"), + FOURTHMOMENT("fourthmoment"), // Geo aggregation functions STUNION("STUnion"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org