noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r869698932


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;

Review Comment:
   done



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java:
##########
@@ -31,7 +31,6 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 
-

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new 
HashMap<>();
+
+    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = 
RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());
+
+      // validation is also done when the table is created, this is just a 
sanity check.
+      Preconditions.checkState(!segmentConfig.aggregateMetrics(),

Review Comment:
   done



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions 
applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the 
schema");
+          }
+          String aggregationFunction = 
aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in 
AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config 
found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = 
RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' 
for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", 
aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", 
aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", 
aggregationConfig);
+
+          
Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == 
null,

Review Comment:
   fixed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {

Review Comment:
   done



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions 
applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;
+
   @JsonPropertyDescription("Config related to handling complex type")
   private final ComplexTypeConfig _complexTypeConfig;
 
   @JsonCreator
   public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable 
BatchIngestionConfig batchIngestionConfig,
       @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig 
streamIngestionConfig,
       @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
+      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> 
aggregationConfigs,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {
+    /**
+     * Currently only, ValueAggregators with fixed width types are allowed, so 
MIN, MAX, SUM, and COUNT. The reason
+     * is that only the {@link 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+     * supports random inserts and lookups. The
+     * {@link 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex
 only supports
+     * sequential inserts.
+     */

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();

Review Comment:
   done, added test



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new 
HashMap<>();
+
+    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = 
RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());

Review Comment:
   done



##########
pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java:
##########
@@ -281,9 +284,10 @@ public void testSerDe()
       Map<String, String> prefixesToRename = new HashMap<>();
       IngestionConfig ingestionConfig =
           new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, 
"APPEND", "HOURLY"),
-              new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), transformConfigs,
-              new ComplexTypeConfig(fieldsToUnnest, ".",
-                      
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename));
+              new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), aggregationConfigs,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> 
enableMetricsAggregationIfPossible(RealtimeSegmentC
         RECORD_ID_MAP);
   }
 
+  private boolean isAggregateMetricsEnabled() {
+    return _aggregateMetrics || _ingestionAggregator.isEnabled();

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = 
_ingestionAggregator.getMetricName(metricFieldSpec.getName());
       Object value = row.getValue(column);
-      MutableForwardIndex forwardIndex = 
_indexContainerMap.get(column)._forwardIndex;
+      MutableForwardIndex forwardIndex = 
_indexContainerMap.get(metricFieldSpec.getName())._forwardIndex;
       DataType dataType = metricFieldSpec.getDataType();
-      switch (dataType) {
-        case INT:
-          forwardIndex.setInt(docId, (Integer) value + 
forwardIndex.getInt(docId));
-          break;
-        case LONG:
-          forwardIndex.setLong(docId, (Long) value + 
forwardIndex.getLong(docId));
-          break;
-        case FLOAT:
-          forwardIndex.setFloat(docId, (Float) value + 
forwardIndex.getFloat(docId));
-          break;
-        case DOUBLE:
-          forwardIndex.setDouble(docId, (Double) value + 
forwardIndex.getDouble(docId));
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported data type: " + dataType + " for aggregate metric 
column: " + column);
+      ValueAggregator valueAggregator = 
_ingestionAggregator.getAggregator(metricFieldSpec.getName());
+
+      if (valueAggregator != null) {
+        switch (valueAggregator.getAggregatedValueType()) {

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || 
!tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null, "The destination column '" + columnName

Review Comment:
   done and added test



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java:
##########
@@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() {
 
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
-  private static final String STEAM_NAME = "testStream";
+  private static final String STREAM_NAME = "testStream";
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> 
_aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator 
fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || 
segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to