Jackie-Jiang commented on code in PR #8611: URL: https://github.com/apache/pinot/pull/8611#discussion_r864331170
########## pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java: ########## @@ -31,7 +31,6 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; - Review Comment: (minor) Revert ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); Review Comment: Suggest keeping a single `Map<String, Pair<String, ValueAggregator>>` to reduce overhead ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentC RECORD_ID_MAP); } + private boolean isAggregateMetricsEnabled() { + return _aggregateMetrics || _ingestionAggregator.isEnabled(); Review Comment: This can be simplified to `return _recordIdMap != null`, and we don't need to track `disabled` inside the `IngestionAggregator` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java: ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; Review Comment: Move this into `ingestion` folder (same level as `TransformConfig`) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); Review Comment: (minor) ```suggestion return new IngestionAggregator(Collections.emptyMap(), Collections.emptyMap()); ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); + for (AggregationConfig aggregationConfig : aggregationConfigs) { + String columnName = aggregationConfig.getColumnName(); + if (schema != null) { + Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName Review Comment: We should also check if it is a metric. We cannot aggregate a dimension/time column ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java: ########## @@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion") private final List<TransformConfig> _transformConfigs; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") + private final List<AggregationConfig> _aggregationConfigs; Review Comment: (minor) Suggest adding it as the last member variable because it is applied at last during ingestion after filter/transform/complexTypeHandling ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java: ########## @@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() { private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; private static final String SEGMENT_NAME = "testSegment__0__0__155555"; - private static final String STEAM_NAME = "testStream"; + private static final String STREAM_NAME = "testStream"; + + public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Review Comment: Suggest adding an extra parameter `preAggregationConfigs` to the last method (with the actual implementation) and call that in this method. We don't want to maintain 2 methods with implementation ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), Review Comment: (minor) indexingConfig can never be `null` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { Review Comment: (minor) Can be simplified, and `segmentConfig` should never be `null` ```suggestion if (CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) { ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); + Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>(); + + for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) { + ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction()); Review Comment: This API has changed. Please rebase the latest master ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java: ########## @@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion") private final List<TransformConfig> _transformConfigs; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") + private final List<AggregationConfig> _aggregationConfigs; + @JsonPropertyDescription("Config related to handling complex type") private final ComplexTypeConfig _complexTypeConfig; @JsonCreator public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable BatchIngestionConfig batchIngestionConfig, @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig streamIngestionConfig, @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig, + @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> aggregationConfigs, Review Comment: (minor) Put it as the last argument, same for the assignment and getter ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + static public void validateIngestionAggregation(String name) { Review Comment: ```suggestion public static void validateIngestionAggregation(String name) { ``` ########## pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java: ########## @@ -281,9 +284,10 @@ public void testSerDe() Map<String, String> prefixesToRename = new HashMap<>(); IngestionConfig ingestionConfig = new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"), - new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs, - new ComplexTypeConfig(fieldsToUnnest, ".", - ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename)); + new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), aggregationConfigs, Review Comment: (code style) Please apply the [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#intellij), and reformat all the changes ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + static public void validateIngestionAggregation(String name) { + /** + * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason + * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex} + * supports random inserts and lookups. The + * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports + * sequential inserts. + */ Review Comment: Move this javadoc above the method declaration ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); + Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>(); + + for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) { + ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction()); + + // validation is also done when the table is created, this is just a sanity check. + Preconditions.checkState(!segmentConfig.aggregateMetrics(), Review Comment: (minor) Move this check out of the for loop ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); + for (AggregationConfig aggregationConfig : aggregationConfigs) { + String columnName = aggregationConfig.getColumnName(); + if (schema != null) { + Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName + + "' of the aggregation function must be present in the schema"); + } + String aggregationFunction = aggregationConfig.getAggregationFunction(); + if (columnName == null || aggregationFunction == null) { + throw new IllegalStateException( + "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig); + } + + if (!aggregationColumns.add(columnName)) { + throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'"); + } + ExpressionContext expressionContext; + try { + expressionContext = RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction()); + } catch (Exception e) { + throw new IllegalStateException( + "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e); + } + Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION, + "aggregation function must be a function for: %s", aggregationConfig); + + FunctionContext functionContext = expressionContext.getFunction(); + validateIngestionAggregation(functionContext.getFunctionName()); + Preconditions.checkState(functionContext.getArguments().size() == 1, + "aggregation function can only have one argument: %s", aggregationConfig); + + ExpressionContext argument = functionContext.getArguments().get(0); + Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER, + "aggregator function argument must be a identifier: %s", aggregationConfig); + + Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == null, Review Comment: Ideally we want to support something like `met1 = SUM(met1)` (`src` and `dest` use the same name) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); Review Comment: We should also check if all metrics are covered. We need to pre-aggregate on all metric columns ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) { private void aggregateMetrics(GenericRow row, int docId) { for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) { - String column = metricFieldSpec.getName(); + String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName()); Object value = row.getValue(column); - MutableForwardIndex forwardIndex = _indexContainerMap.get(column)._forwardIndex; + MutableForwardIndex forwardIndex = _indexContainerMap.get(metricFieldSpec.getName())._forwardIndex; DataType dataType = metricFieldSpec.getDataType(); - switch (dataType) { - case INT: - forwardIndex.setInt(docId, (Integer) value + forwardIndex.getInt(docId)); - break; - case LONG: - forwardIndex.setLong(docId, (Long) value + forwardIndex.getLong(docId)); - break; - case FLOAT: - forwardIndex.setFloat(docId, (Float) value + forwardIndex.getFloat(docId)); - break; - case DOUBLE: - forwardIndex.setDouble(docId, (Double) value + forwardIndex.getDouble(docId)); - break; - default: - throw new UnsupportedOperationException( - "Unsupported data type: " + dataType + " for aggregate metric column: " + column); + ValueAggregator valueAggregator = _ingestionAggregator.getAggregator(metricFieldSpec.getName()); + + if (valueAggregator != null) { + switch (valueAggregator.getAggregatedValueType()) { Review Comment: We should convert the result type to the forward index type if the type does not match. User might want to store `COUNT` in `INT` column in certain cases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org