Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r864331170


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java:
##########
@@ -31,7 +31,6 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 
-

Review Comment:
   (minor) Revert



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();

Review Comment:
   Suggest keeping a single `Map<String, Pair<String, ValueAggregator>>` to 
reduce overhead



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> 
enableMetricsAggregationIfPossible(RealtimeSegmentC
         RECORD_ID_MAP);
   }
 
+  private boolean isAggregateMetricsEnabled() {
+    return _aggregateMetrics || _ingestionAggregator.isEnabled();

Review Comment:
   This can be simplified to `return _recordIdMap != null`, and we don't need 
to track `disabled` inside the `IngestionAggregator`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;

Review Comment:
   Move this into `ingestion` folder (same level as `TransformConfig`)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());

Review Comment:
   (minor)
   ```suggestion
         return new IngestionAggregator(Collections.emptyMap(), 
Collections.emptyMap());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null, "The destination column '" + columnName

Review Comment:
   We should also check if it is a metric. We cannot aggregate a dimension/time 
column



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions 
applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;

Review Comment:
   (minor) Suggest adding it as the last member variable because it is applied 
at last during ingestion after filter/transform/complexTypeHandling



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java:
##########
@@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() {
 
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
-  private static final String STEAM_NAME = "testStream";
+  private static final String STREAM_NAME = "testStream";
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,

Review Comment:
   Suggest adding an extra parameter `preAggregationConfigs` to the last method 
(with the actual implementation) and call that in this method. We don't want to 
maintain 2 methods with implementation



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),

Review Comment:
   (minor) indexingConfig can never be `null`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {

Review Comment:
   (minor) Can be simplified, and `segmentConfig` should never be `null`
   ```suggestion
       if 
(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new 
HashMap<>();
+
+    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = 
RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());

Review Comment:
   This API has changed. Please rebase the latest master



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions 
applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;
+
   @JsonPropertyDescription("Config related to handling complex type")
   private final ComplexTypeConfig _complexTypeConfig;
 
   @JsonCreator
   public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable 
BatchIngestionConfig batchIngestionConfig,
       @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig 
streamIngestionConfig,
       @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
+      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> 
aggregationConfigs,

Review Comment:
   (minor) Put it as the last argument, same for the assignment and getter



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {

Review Comment:
   ```suggestion
     public static void validateIngestionAggregation(String name) {
   ```



##########
pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java:
##########
@@ -281,9 +284,10 @@ public void testSerDe()
       Map<String, String> prefixesToRename = new HashMap<>();
       IngestionConfig ingestionConfig =
           new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, 
"APPEND", "HOURLY"),
-              new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), transformConfigs,
-              new ComplexTypeConfig(fieldsToUnnest, ".",
-                      
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename));
+              new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), aggregationConfigs,

Review Comment:
   (code style) Please apply the [Pinot 
Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#intellij),
 and reformat all the changes



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {
+    /**
+     * Currently only, ValueAggregators with fixed width types are allowed, so 
MIN, MAX, SUM, and COUNT. The reason
+     * is that only the {@link 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+     * supports random inserts and lookups. The
+     * {@link 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex
 only supports
+     * sequential inserts.
+     */

Review Comment:
   Move this javadoc above the method declaration



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new 
HashMap<>();
+
+    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = 
RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());
+
+      // validation is also done when the table is created, this is just a 
sanity check.
+      Preconditions.checkState(!segmentConfig.aggregateMetrics(),

Review Comment:
   (minor) Move this check out of the for loop



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the 
schema");
+          }
+          String aggregationFunction = 
aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in 
AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config 
found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = 
RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' 
for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", 
aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", 
aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", 
aggregationConfig);
+
+          
Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == 
null,

Review Comment:
   Ideally we want to support something like `met1 = SUM(met1)` (`src` and 
`dest` use the same name)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();

Review Comment:
   We should also check if all metrics are covered. We need to pre-aggregate on 
all metric columns



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = 
_ingestionAggregator.getMetricName(metricFieldSpec.getName());
       Object value = row.getValue(column);
-      MutableForwardIndex forwardIndex = 
_indexContainerMap.get(column)._forwardIndex;
+      MutableForwardIndex forwardIndex = 
_indexContainerMap.get(metricFieldSpec.getName())._forwardIndex;
       DataType dataType = metricFieldSpec.getDataType();
-      switch (dataType) {
-        case INT:
-          forwardIndex.setInt(docId, (Integer) value + 
forwardIndex.getInt(docId));
-          break;
-        case LONG:
-          forwardIndex.setLong(docId, (Long) value + 
forwardIndex.getLong(docId));
-          break;
-        case FLOAT:
-          forwardIndex.setFloat(docId, (Float) value + 
forwardIndex.getFloat(docId));
-          break;
-        case DOUBLE:
-          forwardIndex.setDouble(docId, (Double) value + 
forwardIndex.getDouble(docId));
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported data type: " + dataType + " for aggregate metric 
column: " + column);
+      ValueAggregator valueAggregator = 
_ingestionAggregator.getAggregator(metricFieldSpec.getName());
+
+      if (valueAggregator != null) {
+        switch (valueAggregator.getAggregatedValueType()) {

Review Comment:
   We should convert the result type to the forward index type if the type does 
not match. User might want to store `COUNT` in `INT` column in certain cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to