This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae02ecefa4 Ingestion Aggregation Feature  (#8611)
ae02ecefa4 is described below

commit ae02ecefa4b4367fc3547fa5bfb718d63b45b477
Author: noon-stripe <89272166+noon-str...@users.noreply.github.com>
AuthorDate: Tue May 17 20:09:14 2022 -0700

    Ingestion Aggregation Feature  (#8611)
    
    This feature aggregates values at ingestion time, which reduces the number 
of rows stores (thus speeding up queries), using the same strategy as the 
'aggregateMetrics' feature. This expands the feature to include, COUNT, MIN, 
and MAX, with the ability to expand further.
---
 .../common/utils/config/TableConfigUtils.java      |   2 +-
 .../common/utils/config/TableConfigSerDeTest.java  |   5 +-
 .../connector/flink/http/PinotConnectionUtils.java |   6 +-
 .../flink/sink/PinotSinkIntegrationTest.java       |   2 +-
 .../pinot/controller/util/FileIngestionHelper.java |   2 +-
 .../helix/core/PinotHelixResourceManagerTest.java  |   2 +-
 .../core/retention/SegmentLineageCleanupTest.java  |   2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |   4 +-
 .../apache/pinot/core/util/SchemaUtilsTest.java    |   7 +-
 .../IngestionConfigHybridIntegrationTest.java      |   3 +-
 .../tests/JsonPathClusterIntegrationTest.java      |   2 +-
 .../tests/MapTypeClusterIntegrationTest.java       |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |   2 +-
 .../SegmentWriterUploaderIntegrationTest.java      |   2 +-
 .../preprocess/DataPreprocessingHelperTest.java    |   2 +-
 .../mergerollup/MergeRollupTaskGeneratorTest.java  |   2 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |   7 +-
 .../filebased/FileBasedSegmentWriterTest.java      |  20 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 245 +++++++++++++++++----
 .../local/realtime/impl/RealtimeSegmentConfig.java |  18 +-
 .../pinot/segment/local/utils/IngestionUtils.java  |  11 +
 .../segment/local/utils/TableConfigUtils.java      |  87 +++++++-
 ...MutableSegmentImplIngestionAggregationTest.java | 230 +++++++++++++++++++
 .../mutable/MutableSegmentImplTestUtils.java       |  24 +-
 .../ExpressionTransformerTest.java                 |  16 +-
 .../recordtransformer/RecordTransformerTest.java   |  40 ++--
 .../SegmentGenerationWithFilterRecordsTest.java    |   3 +-
 .../index/loader/SegmentPreProcessorTest.java      |   6 +-
 .../segment/local/utils/IngestionUtilsTest.java    |  43 +++-
 .../segment/local/utils/TableConfigUtilsTest.java  | 242 +++++++++++++++++---
 .../config/table/ingestion/AggregationConfig.java  |  49 +++++
 .../config/table/ingestion/IngestionConfig.java    |  17 +-
 .../pinot/spi/utils/IngestionConfigUtils.java      |  16 +-
 .../spi/utils/builder/TableConfigBuilder.java      |   7 +
 .../pinot/spi/utils/IngestionConfigUtilsTest.java  |  10 +-
 35 files changed, 973 insertions(+), 165 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 44e0ac7f9a..e515249761 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -268,7 +268,7 @@ public class TableConfigUtils {
 
     if (ingestionConfig == null) {
       if (batchIngestionConfig != null || streamIngestionConfig != null) {
-        ingestionConfig = new IngestionConfig(batchIngestionConfig, 
streamIngestionConfig, null, null, null);
+        ingestionConfig = new IngestionConfig(batchIngestionConfig, 
streamIngestionConfig, null, null, null, null);
       }
     } else {
       if (batchIngestionConfig != null) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 1a2e39531d..05f12d04a0 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -47,6 +47,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -267,6 +268,8 @@ public class TableConfigSerDeTest {
     }
     {
       // With ingestion config
+      List<AggregationConfig> aggregationConfigs = Lists.newArrayList(new 
AggregationConfig("SUM__bar", "SUM(bar)"),
+          new AggregationConfig("MIN_foo", "MIN(foo)"));
       List<TransformConfig> transformConfigs =
           Lists.newArrayList(new TransformConfig("bar", "func(moo)"), new 
TransformConfig("zoo", "myfunc()"));
       Map<String, String> batchConfigMap = new HashMap<>();
@@ -283,7 +286,7 @@ public class TableConfigSerDeTest {
           new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, 
"APPEND", "HOURLY"),
               new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), transformConfigs,
               new ComplexTypeConfig(fieldsToUnnest, ".", 
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
-                  prefixesToRename));
+                  prefixesToRename), aggregationConfigs);
       TableConfig tableConfig = 
tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
 
       checkIngestionConfig(tableConfig);
diff --git 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
index cdefa67c83..500b8fed26 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
@@ -66,14 +66,14 @@ public final class PinotConnectionUtils {
     if (ingestionConfig == null) {
       tableConfig.setIngestionConfig(
           new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", 
"HOURLY"),
-              null, null, null, null));
+              null, null, null, null, null));
       return tableConfig;
     }
     if (ingestionConfig.getBatchIngestionConfig() == null) {
       tableConfig.setIngestionConfig(
           new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", 
"HOURLY"),
               null, ingestionConfig.getFilterConfig(), 
ingestionConfig.getTransformConfigs(),
-              ingestionConfig.getComplexTypeConfig()));
+              ingestionConfig.getComplexTypeConfig(), 
ingestionConfig.getAggregationConfigs()));
       return tableConfig;
     }
 
@@ -86,7 +86,7 @@ public final class PinotConnectionUtils {
         new BatchIngestionConfig(batchConfigMaps, 
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
             
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null,
         ingestionConfig.getFilterConfig(), 
ingestionConfig.getTransformConfigs(),
-        ingestionConfig.getComplexTypeConfig()));
+        ingestionConfig.getComplexTypeConfig(), 
ingestionConfig.getAggregationConfigs()));
 
     return tableConfig;
   }
diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
index 6b340a7918..a9a9086716 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
@@ -81,7 +81,7 @@ public class PinotSinkIntegrationTest extends 
BaseClusterIntegrationTest {
     batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
     IngestionConfig ingestionConfig =
         new IngestionConfig(new 
BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND", 
"HOURLY"), null,
-            null, null, null);
+            null, null, null, null);
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
             .build();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 961240edbb..1b1238c4a2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -148,7 +148,7 @@ public class FileIngestionHelper {
 
       // Upload segment
       IngestionConfig ingestionConfigOverride =
-          new IngestionConfig(batchIngestionConfigOverride, null, null, null, 
null);
+          new IngestionConfig(batchIngestionConfigOverride, null, null, null, 
null, null);
       TableConfig tableConfigOverride =
           new 
TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName())
               .setIngestionConfig(ingestionConfigOverride).build();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 4cb3d5c89d..f00b717c8d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -789,7 +789,7 @@ public class PinotHelixResourceManagerTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
             
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
             .setIngestionConfig(
-                new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null))
+                new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null, null))
             .build();
 
     ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index 4a8cb0a8f8..a54515269e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -100,7 +100,7 @@ public class SegmentLineageCleanupTest {
     ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
 
     IngestionConfig ingestionConfig =
-        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null);
+        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null, null);
     TableConfig refreshTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
         
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
         
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 48fa30e959..31cf25a55b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1310,7 +1310,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             
.setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
-            
.setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
+            .setAggregateMetrics(indexingConfig.isAggregateMetrics())
+            
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
+            .setNullHandlingEnabled(_nullHandlingEnabled)
             
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setHashFunction(tableConfig.getHashFunction())
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 36d9eb29b2..af3d226708 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -93,7 +93,7 @@ public class SchemaUtilsTest {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("colA", "round(colB, 1000)")),
-            null)).build();
+            null, null)).build();
     try {
       SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
       Assert.fail("Should fail schema validation, as colA is not present in 
schema");
@@ -140,9 +140,8 @@ public class SchemaUtilsTest {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setIngestionConfig(
-            new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("colA", "round(colB, 1000)")),
-                null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null,
+            Lists.newArrayList(new TransformConfig("colA", "round(colB, 
1000)")), null, null)).build();
     try {
       SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
       Assert.fail("Should fail schema validation, as colA is not present in 
schema");
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
index 5378425c4e..9fdd12e940 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
@@ -69,7 +69,8 @@ public class IngestionConfigHybridIntegrationTest extends 
BaseClusterIntegration
 
     List<Map<String, String>> streamConfigMaps = new ArrayList<>();
     streamConfigMaps.add(getStreamConfigMap());
-    return new IngestionConfig(null, new 
StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null);
+    return new IngestionConfig(null, new 
StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null,
+        null);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
index ba5bcd30eb..065cca63de 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
@@ -95,7 +95,7 @@ public class JsonPathClusterIntegrationTest extends 
BaseClusterIntegrationTest {
         new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME,
             "jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')"));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
     addTableConfig(tableConfig);
 
     // Create and upload segments
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index 02e7a73964..726572a7ca 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -84,7 +84,7 @@ public class MapTypeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
         new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + 
STRING_KEY_MAP_FIELD_NAME + ")"),
         new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + 
INT_KEY_MAP_FIELD_NAME + ")"));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
     addTableConfig(tableConfig);
 
     // Create and upload segments
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 30873869e5..df672f20e8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -948,7 +948,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
         Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", 
"times(DaysSinceEpoch, 24)"),
             new TransformConfig("NewAddedDerivedSecondsSinceEpoch", 
"times(times(DaysSinceEpoch, 24), 3600)"),
-            new TransformConfig("NewAddedDerivedMVStringDimension", 
"split(DestCityName, ', ')")), null));
+            new TransformConfig("NewAddedDerivedMVStringDimension", 
"split(DestCityName, ', ')")), null, null));
     updateTableConfig(tableConfig);
 
     // Trigger reload
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
index 28f538ed08..fa67d8c62a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
@@ -90,7 +90,7 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
     batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
     batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
     return new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-        null, null, null);
+        null, null, null, null);
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index 2f97f720f8..9cbb840997 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -55,7 +55,7 @@ public class DataPreprocessingHelperTest {
     DataPreprocessingHelper dataPreprocessingHelper = new 
AvroDataPreprocessingHelper(inputPaths, outputPath);
 
     BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, 
"APPEND", "DAILY");
-    IngestionConfig ingestionConfig = new 
IngestionConfig(batchIngestionConfig, null, null, null, null);
+    IngestionConfig ingestionConfig = new 
IngestionConfig(batchIngestionConfig, null, null, null, null, null);
 
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTableName").setIngestionConfig(ingestionConfig)
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 5e503729fc..1fbfcc59d1 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -97,7 +97,7 @@ public class MergeRollupTaskGeneratorTest {
 
     // Skip task generation, if REFRESH table
     IngestionConfig ingestionConfig =
-        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null), 
null, null, null, null);
+        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null), 
null, null, null, null, null);
     offlineTableConfig = getOfflineTableConfig(new HashMap<>());
     offlineTableConfig.setIngestionConfig(ingestionConfig);
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 4fcdbf6824..be3118a51b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -100,13 +100,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
             .setSortedColumn(D1).build();
     TableConfig tableConfigEpochHours =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
-            .setSortedColumn(D1).setIngestionConfig(
-            new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig(T_TRX, "toEpochHours(t)")),
-                null)).build();
+            .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, 
null, null,
+                Lists.newArrayList(new TransformConfig(T_TRX, 
"toEpochHours(t)")), null, null)).build();
     TableConfig tableConfigSDF =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
             .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, 
null, null,
-            Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 
'yyyyMMddHH')")), null)).build();
+                Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 
'yyyyMMddHH')")), null, null)).build();
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
             .addMetric(M1, FieldSpec.DataType.INT)
diff --git 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
index 73833787e2..f48eb2195a 100644
--- 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
+++ 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
@@ -76,7 +76,7 @@ public class FileBasedSegmentWriterTest {
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
     _ingestionConfig =
         new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-            null, transformConfigs, null);
+            null, transformConfigs, null, null);
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(_ingestionConfig)
             .setTimeColumnName(TIME_COLUMN_NAME).build();
@@ -112,7 +112,7 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null, 
null));
+    tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null, 
null, null));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchIngestionConfig");
@@ -121,7 +121,7 @@ public class FileBasedSegmentWriterTest {
     }
 
     tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null));
+        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchConfigMaps");
@@ -131,7 +131,7 @@ public class FileBasedSegmentWriterTest {
 
     tableConfig.setIngestionConfig(
         new IngestionConfig(new BatchIngestionConfig(Collections.emptyList(), 
"APPEND", "HOURLY"), null, null, null,
-            null));
+            null, null));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchConfigMaps");
@@ -141,7 +141,7 @@ public class FileBasedSegmentWriterTest {
 
     tableConfig.setIngestionConfig(
         new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(Collections.emptyMap()), "APPEND", 
"HOURLY"),
-            null, null, null, null));
+            null, null, null, null, null));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing outputDirURI in batchConfigMap");
@@ -153,7 +153,7 @@ public class FileBasedSegmentWriterTest {
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
     tableConfig.setIngestionConfig(
         new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-            null, null, null));
+            null, null, null, null));
     segmentWriter.init(tableConfig, _schema);
     segmentWriter.close();
   }
@@ -255,7 +255,7 @@ public class FileBasedSegmentWriterTest {
             .setIngestionConfig(new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
                 
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
                 
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-                _ingestionConfig.getTransformConfigs(), null)).build();
+                _ingestionConfig.getTransformConfigs(), null, null)).build();
 
     SegmentWriter segmentWriter = new FileBasedSegmentWriter();
     segmentWriter.init(tableConfig, _schema);
@@ -281,7 +281,7 @@ public class FileBasedSegmentWriterTest {
         new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
             
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
             
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-        _ingestionConfig.getTransformConfigs(), null));
+        _ingestionConfig.getTransformConfigs(), null, null));
     segmentWriter.init(tableConfig, _schema);
 
     // write 2 records
@@ -305,7 +305,7 @@ public class FileBasedSegmentWriterTest {
         new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
             
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
             
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-        _ingestionConfig.getTransformConfigs(), null));
+        _ingestionConfig.getTransformConfigs(), null, null));
     segmentWriter.init(tableConfig, _schema);
 
     // write 2 records
@@ -338,7 +338,7 @@ public class FileBasedSegmentWriterTest {
             .setIngestionConfig(new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
                 
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
                 
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-                _ingestionConfig.getTransformConfigs(), null)).build();
+                _ingestionConfig.getTransformConfigs(), null, null)).build();
     segmentWriter.init(tableConfig, _schema);
 
     // write 3 records with same timestamp
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index ecab67ed5d..a27be0b2f7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -34,9 +34,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
@@ -56,6 +63,8 @@ import 
org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.local.utils.IdMap;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -79,6 +88,7 @@ import 
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -127,7 +137,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, IndexContainer> _indexContainerMap = new 
HashMap<>();
 
   private final IdMap<FixedIntArray> _recordIdMap;
-  private boolean _aggregateMetrics;
 
   private volatile int _numDocsIndexed = 0;
   private final int _numKeyColumns;
@@ -196,7 +205,6 @@ public class MutableSegmentImpl implements MutableSegment {
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
-    _aggregateMetrics = config.aggregateMetrics();
 
     Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
     List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
@@ -237,6 +245,15 @@ public class MutableSegmentImpl implements MutableSegment {
 
     int avgNumMultiValues = config.getAvgNumMultiValues();
 
+    // Metric aggregation can be enabled only if config is specified, and all 
dimensions have dictionary,
+    // and no metrics have dictionary. If not enabled, the map returned is 
null.
+    _recordIdMap = enableMetricsAggregationIfPossible(config, 
noDictionaryColumns);
+
+    Map<String, Pair<String, ValueAggregator>> metricsAggregators = 
Collections.emptyMap();
+    if (_recordIdMap != null) {
+      metricsAggregators = getMetricsAggregators(config);
+    }
+
     // Initialize for each column
     for (FieldSpec fieldSpec : _physicalFieldSpecs) {
       String column = fieldSpec.getName();
@@ -336,10 +353,16 @@ public class MutableSegmentImpl implements MutableSegment 
{
       // Null value vector
       MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new 
MutableNullValueVector() : null;
 
+      Pair<String, ValueAggregator> columnAggregatorPair =
+          metricsAggregators.getOrDefault(column, Pair.of(column, null));
+      String sourceColumn = columnAggregatorPair.getLeft();
+      ValueAggregator valueAggregator = columnAggregatorPair.getRight();
+
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
           new IndexContainer(fieldSpec, partitionFunction, partitions, new 
NumValuesInfo(), forwardIndex, dictionary,
-              invertedIndexReader, null, textIndex, jsonIndex, h3Index, null, 
nullValueVector));
+              invertedIndexReader, null, textIndex, jsonIndex, h3Index, null, 
nullValueVector, sourceColumn,
+              valueAggregator));
     }
 
     // TODO separate concerns: this logic does not belong here
@@ -349,14 +372,11 @@ public class MutableSegmentImpl implements MutableSegment 
{
       
realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders);
     }
 
-    // Metric aggregation can be enabled only if config is specified, and all 
dimensions have dictionary,
-    // and no metrics have dictionary. If not enabled, the map returned is 
null.
-    _recordIdMap = enableMetricsAggregationIfPossible(config, 
noDictionaryColumns);
-
     // init upsert-related data structure
     _upsertMode = config.getUpsertMode();
     if (isUpsertEnabled()) {
-      Preconditions.checkState(!_aggregateMetrics, "Metrics aggregation and 
upsert cannot be enabled together");
+      Preconditions.checkState(!isAggregateMetricsEnabled(),
+          "Metrics aggregation and upsert cannot be enabled together");
       _partitionUpsertMetadataManager = 
config.getPartitionUpsertMetadataManager();
       _validDocIds = new ThreadSafeMutableRoaringBitmap();
       String upsertComparisonColumn = config.getUpsertComparisonColumn();
@@ -390,7 +410,8 @@ public class MutableSegmentImpl implements MutableSegment {
       // of noDict on STRING/BYTES. Without metrics aggregation, memory 
pressure increases.
       // So to continue aggregating metrics for such cases, we will create 
dictionary even
       // if the column is part of noDictionary set from table config
-      if (fieldSpec instanceof DimensionFieldSpec && _aggregateMetrics && 
(dataType == STRING || dataType == BYTES)) {
+      if (fieldSpec instanceof DimensionFieldSpec && 
isAggregateMetricsEnabled() && (dataType == STRING
+          || dataType == BYTES)) {
         _logger.info(
             "Aggregate metrics is enabled. Will create dictionary in consuming 
segment for column {} of type {}",
             column, dataType.toString());
@@ -478,7 +499,7 @@ public class MutableSegmentImpl implements MutableSegment {
         // Update number of documents indexed at last to make the latest row 
queryable
         canTakeMore = numDocsIndexed++ < _capacity;
       } else {
-        assert _aggregateMetrics;
+        assert isAggregateMetricsEnabled();
         aggregateMetrics(row, docId);
         canTakeMore = true;
       }
@@ -508,13 +529,16 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
   private void updateDictionary(GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
-      Object value = row.getValue(column);
       MutableDictionary dictionary = indexContainer._dictionary;
+      if (dictionary == null) {
+        continue;
+      }
+
+      Object value = row.getValue(entry.getKey());
       if (value == null) {
         recordIndexingError("DICTIONARY");
-      } else if (dictionary != null) {
+      } else {
         if (indexContainer._fieldSpec.isSingleValueField()) {
           indexContainer._dictId = dictionary.index(value);
         } else {
@@ -533,6 +557,38 @@ public class MutableSegmentImpl implements MutableSegment {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
 
+      // aggregate metrics is enabled.
+      if (indexContainer._valueAggregator != null) {
+        Object value = row.getValue(indexContainer._sourceColumn);
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        FieldSpec fieldSpec = indexContainer._fieldSpec;
+
+        DataType dataType = fieldSpec.getDataType();
+        value = 
indexContainer._valueAggregator.getInitialAggregatedValue(value);
+        switch (dataType.getStoredType()) {
+          case INT:
+            forwardIndex.setInt(docId, ((Number) value).intValue());
+            break;
+          case LONG:
+            forwardIndex.setLong(docId, ((Number) value).longValue());
+            break;
+          case FLOAT:
+            forwardIndex.setFloat(docId, ((Number) value).floatValue());
+            break;
+          case DOUBLE:
+            forwardIndex.setDouble(docId, ((Number) value).doubleValue());
+            break;
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported data type: " + dataType + " for aggregation: " + 
column);
+        }
+        continue;
+      }
+
       // Update the null value vector even if a null value is somehow produced
       if (_nullHandlingEnabled && row.isNullValue(column)) {
         indexContainer._nullValueVector.setNull(docId);
@@ -612,7 +668,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
           // Update min/max value from raw value
           // NOTE: Skip updating min/max value for aggregated metrics because 
the value will change over time.
-          if (!_aggregateMetrics || fieldSpec.getFieldType() != 
FieldSpec.FieldType.METRIC) {
+          if (!isAggregateMetricsEnabled() || fieldSpec.getFieldType() != 
FieldSpec.FieldType.METRIC) {
             Comparable comparable;
             if (dataType == BYTES) {
               comparable = new ByteArray((byte[]) value);
@@ -706,26 +762,75 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
-      Object value = row.getValue(column);
-      MutableForwardIndex forwardIndex = 
_indexContainerMap.get(column)._forwardIndex;
+      IndexContainer indexContainer = 
_indexContainerMap.get(metricFieldSpec.getName());
+      Object value = row.getValue(indexContainer._sourceColumn);
+      MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
       DataType dataType = metricFieldSpec.getDataType();
-      switch (dataType) {
-        case INT:
-          forwardIndex.setInt(docId, (Integer) value + 
forwardIndex.getInt(docId));
+
+      Double oldDoubleValue;
+      Double newDoubleValue;
+      Long oldLongValue;
+      Long newLongValue;
+      ValueAggregator valueAggregator = indexContainer._valueAggregator;
+      switch (valueAggregator.getAggregatedValueType()) {
+        case DOUBLE:
+          switch (dataType) {
+            case INT:
+              oldDoubleValue = ((Integer) 
forwardIndex.getInt(docId)).doubleValue();
+              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setInt(docId, newDoubleValue.intValue());
+              break;
+            case LONG:
+              oldDoubleValue = ((Long) 
forwardIndex.getLong(docId)).doubleValue();
+              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setLong(docId, newDoubleValue.longValue());
+              break;
+            case FLOAT:
+              oldDoubleValue = ((Float) 
forwardIndex.getFloat(docId)).doubleValue();
+              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setFloat(docId, newDoubleValue.floatValue());
+              break;
+            case DOUBLE:
+              oldDoubleValue = forwardIndex.getDouble(docId);
+              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setDouble(docId, newDoubleValue);
+              break;
+            default:
+              throw new 
UnsupportedOperationException(String.format("Aggregation type %s of %s not 
supported for %s",
+                  valueAggregator.getAggregatedValueType(), 
valueAggregator.getAggregationType(), dataType));
+          }
           break;
         case LONG:
-          forwardIndex.setLong(docId, (Long) value + 
forwardIndex.getLong(docId));
-          break;
-        case FLOAT:
-          forwardIndex.setFloat(docId, (Float) value + 
forwardIndex.getFloat(docId));
-          break;
-        case DOUBLE:
-          forwardIndex.setDouble(docId, (Double) value + 
forwardIndex.getDouble(docId));
+          switch (dataType) {
+            case INT:
+              oldLongValue = ((Integer) 
forwardIndex.getInt(docId)).longValue();
+              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setInt(docId, newLongValue.intValue());
+              break;
+            case LONG:
+              oldLongValue = forwardIndex.getLong(docId);
+              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setLong(docId, newLongValue);
+              break;
+            case FLOAT:
+              oldLongValue = ((Float) 
forwardIndex.getFloat(docId)).longValue();
+              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setFloat(docId, newLongValue.floatValue());
+              break;
+            case DOUBLE:
+              oldLongValue = ((Double) 
forwardIndex.getDouble(docId)).longValue();
+              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setDouble(docId, newLongValue.doubleValue());
+              break;
+            default:
+              throw new 
UnsupportedOperationException(String.format("Aggregation type %s of %s not 
supported for %s",
+                  valueAggregator.getAggregatedValueType(), 
valueAggregator.getAggregationType(), dataType));
+          }
           break;
         default:
           throw new UnsupportedOperationException(
-              "Unsupported data type: " + dataType + " for aggregate metric 
column: " + column);
+              String.format("Aggregation type %s of %s not supported for %s", 
valueAggregator.getAggregatedValueType(),
+                  valueAggregator.getAggregationType(), dataType));
       }
     }
   }
@@ -969,7 +1074,7 @@ public class MutableSegmentImpl implements MutableSegment {
    *
    * */
   private int getOrCreateDocId() {
-    if (!_aggregateMetrics) {
+    if (!isAggregateMetricsEnabled()) {
       return _numDocsIndexed;
     }
 
@@ -1005,7 +1110,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
    */
   private IdMap<FixedIntArray> 
enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,
       Set<String> noDictionaryColumns) {
-    if (!_aggregateMetrics) {
+    if (!config.aggregateMetrics() && 
CollectionUtils.isEmpty(config.getIngestionAggregationConfigs())) {
       _logger.info("Metrics aggregation is disabled.");
       return null;
     }
@@ -1017,15 +1122,13 @@ public class MutableSegmentImpl implements 
MutableSegment {
       if (!noDictionaryColumns.contains(metric)) {
         _logger.warn("Metrics aggregation cannot be turned ON in presence of 
dictionary encoded metrics, eg: {}",
             metric);
-        _aggregateMetrics = false;
-        break;
+        return null;
       }
 
       if (!fieldSpec.isSingleValueField()) {
         _logger.warn("Metrics aggregation cannot be turned ON in presence of 
multi-value metric columns, eg: {}",
             metric);
-        _aggregateMetrics = false;
-        break;
+        return null;
       }
     }
 
@@ -1036,15 +1139,13 @@ public class MutableSegmentImpl implements 
MutableSegment {
       if (noDictionaryColumns.contains(dimension)) {
         _logger.warn("Metrics aggregation cannot be turned ON in presence of 
no-dictionary dimensions, eg: {}",
             dimension);
-        _aggregateMetrics = false;
-        break;
+        return null;
       }
 
       if (!fieldSpec.isSingleValueField()) {
         _logger.warn("Metrics aggregation cannot be turned ON in presence of 
multi-value dimension columns, eg: {}",
             dimension);
-        _aggregateMetrics = false;
-        break;
+        return null;
       }
     }
 
@@ -1054,15 +1155,10 @@ public class MutableSegmentImpl implements 
MutableSegment {
         _logger.warn(
             "Metrics aggregation cannot be turned ON in presence of 
no-dictionary datetime/time columns, eg: {}",
             timeColumnName);
-        _aggregateMetrics = false;
-        break;
+        return null;
       }
     }
 
-    if (!_aggregateMetrics) {
-      return null;
-    }
-
     int estimatedRowsToIndex;
     if (_statsHistory.isEmpty()) {
       // Choose estimated rows to index as maxNumRowsPerSegment / 
EXPECTED_COMPRESSION (1000, to be conservative in
@@ -1082,6 +1178,10 @@ public class MutableSegmentImpl implements 
MutableSegment {
         RECORD_ID_MAP);
   }
 
+  private boolean isAggregateMetricsEnabled() {
+    return _recordIdMap != null;
+  }
+
   // NOTE: Okay for single-writer
   @SuppressWarnings("NonAtomicOperationOnVolatileField")
   private static class NumValuesInfo {
@@ -1098,6 +1198,56 @@ public class MutableSegmentImpl implements 
MutableSegment {
     }
   }
 
+  private static Map<String, Pair<String, ValueAggregator>> 
getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig.aggregateMetrics()) {
+      return fromAggregateMetrics(segmentConfig);
+    } else if 
(!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
+      return fromAggregationConfig(segmentConfig);
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  private static Map<String, Pair<String, ValueAggregator>> 
fromAggregateMetrics(RealtimeSegmentConfig segmentConfig) {
+    
Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()),
+        "aggregateMetrics cannot be enabled if AggregationConfig is set");
+
+    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new 
HashMap<>();
+    for (String metricName : segmentConfig.getSchema().getMetricNames()) {
+      columnNameToAggregator.put(metricName,
+          Pair.of(metricName, 
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM)));
+    }
+    return columnNameToAggregator;
+  }
+
+  private static Map<String, Pair<String, ValueAggregator>> 
fromAggregationConfig(RealtimeSegmentConfig segmentConfig) {
+    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new 
HashMap<>();
+
+    Preconditions.checkState(!segmentConfig.aggregateMetrics(),
+        "aggregateMetrics cannot be enabled if AggregationConfig is set");
+    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = 
RequestContextUtils.getExpression(config.getAggregationFunction());
+      // validation is also done when the table is created, this is just a 
sanity check.
+      Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
+          "aggregation function must be a function: %s", config);
+      FunctionContext functionContext = expressionContext.getFunction();
+      
TableConfigUtils.validateIngestionAggregation(functionContext.getFunctionName());
+      Preconditions.checkState(functionContext.getArguments().size() == 1,
+          "aggregation function can only have one argument: %s", config);
+      ExpressionContext argument = functionContext.getArguments().get(0);
+      Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
+          "aggregator function argument must be a identifier: %s", config);
+
+      AggregationFunctionType functionType =
+          
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
+
+      columnNameToAggregator.put(config.getColumnName(),
+          Pair.of(argument.getLiteral(), 
ValueAggregatorFactory.getValueAggregator(functionType)));
+    }
+
+    return columnNameToAggregator;
+  }
+
   private class IndexContainer implements Closeable {
     final FieldSpec _fieldSpec;
     final PartitionFunction _partitionFunction;
@@ -1112,6 +1262,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
     final MutableJsonIndex _jsonIndex;
     final BloomFilterReader _bloomFilter;
     final MutableNullValueVector _nullValueVector;
+    final String _sourceColumn;
+    final ValueAggregator _valueAggregator;
 
     volatile Comparable _minValue;
     volatile Comparable _maxValue;
@@ -1125,7 +1277,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex 
invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex 
textIndex,
         @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index 
h3Index, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+        @Nullable MutableNullValueVector nullValueVector, @Nullable String 
sourceColumn,
+        @Nullable ValueAggregator valueAggregator) {
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
       _partitions = partitions;
@@ -1140,6 +1293,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
       _jsonIndex = jsonIndex;
       _bloomFilter = bloomFilter;
       _nullValueVector = nullValueVector;
+      _sourceColumn = sourceColumn;
+      _valueAggregator = valueAggregator;
     }
 
     DataSource toDataSource() {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 5ac3189ea0..dcc89db811 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -64,6 +65,7 @@ public class RealtimeSegmentConfig {
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final String _consumerDir;
   private final List<FieldConfig> _fieldConfigList;
+  private final List<AggregationConfig> _ingestionAggregationConfigs;
 
   // TODO: Clean up this constructor. Most of these things can be extracted 
from tableConfig.
   private RealtimeSegmentConfig(String tableNameWithType, String segmentName, 
String streamName, Schema schema,
@@ -74,7 +76,8 @@ public class RealtimeSegmentConfig {
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, 
PartitionFunction partitionFunction,
       int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, 
String consumerDir,
       UpsertConfig.Mode upsertMode, String upsertComparisonColumn, 
UpsertConfig.HashFunction hashFunction,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
List<FieldConfig> fieldConfigList) {
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
List<FieldConfig> fieldConfigList,
+      List<AggregationConfig> ingestionAggregationConfigs) {
     _tableNameWithType = tableNameWithType;
     _segmentName = segmentName;
     _streamName = streamName;
@@ -104,6 +107,7 @@ public class RealtimeSegmentConfig {
     _upsertComparisonColumn = upsertComparisonColumn;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _fieldConfigList = fieldConfigList;
+    _ingestionAggregationConfigs = ingestionAggregationConfigs;
   }
 
   public String getTableNameWithType() {
@@ -227,6 +231,10 @@ public class RealtimeSegmentConfig {
     return _fieldConfigList;
   }
 
+  public List<AggregationConfig> getIngestionAggregationConfigs() {
+    return _ingestionAggregationConfigs;
+  }
+
   public static class Builder {
     private String _tableNameWithType;
     private String _segmentName;
@@ -257,6 +265,7 @@ public class RealtimeSegmentConfig {
     private String _upsertComparisonColumn;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
     private List<FieldConfig> _fieldConfigList;
+    private List<AggregationConfig> _ingestionAggregationConfigs;
 
     public Builder() {
     }
@@ -414,13 +423,18 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setIngestionAggregationConfigs(List<AggregationConfig> 
ingestionAggregationConfigs) {
+      _ingestionAggregationConfigs = ingestionAggregationConfigs;
+      return this;
+    }
+
     public RealtimeSegmentConfig build() {
       return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, 
_streamName, _schema, _timeColumnName,
           _capacity, _avgNumMultiValues, _noDictionaryColumns, 
_varLengthDictionaryColumns, _invertedIndexColumns,
           _textIndexColumns, _fstIndexColumns, _jsonIndexColumns, 
_h3IndexConfigs, _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, 
_partitionId, _aggregateMetrics,
           _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertComparisonColumn, _hashFunction,
-          _partitionUpsertMetadataManager, _fieldConfigList);
+          _partitionUpsertMetadataManager, _fieldConfigList, 
_ingestionAggregationConfigs);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index eea20f0687..1a4695f9a0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -30,6 +30,8 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
@@ -41,6 +43,7 @@ import 
org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -361,6 +364,14 @@ public final class IngestionUtils {
           fields.addAll(functionEvaluator.getArguments());
         }
       }
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      if (aggregationConfigs != null) {
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          ExpressionContext expressionContext =
+              
RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          expressionContext.getColumns(fields);
+        }
+      }
       List<TransformConfig> transformConfigs = 
ingestionConfig.getTransformConfigs();
       if (transformConfigs != null) {
         for (TransformConfig transformConfig : transformConfigs) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index fd831e70d1..381c74dcb5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,10 +35,14 @@ import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -50,6 +55,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -89,6 +95,9 @@ public final class TableConfigUtils {
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
+  private static final EnumSet<AggregationFunctionType> 
SUPPORTED_INGESTION_AGGREGATIONS =
+      EnumSet.of(AggregationFunctionType.SUM, AggregationFunctionType.MIN, 
AggregationFunctionType.MAX,
+          AggregationFunctionType.COUNT);
 
   /**
    * @see TableConfigUtils#validate(TableConfig, Schema, String, boolean)
@@ -306,6 +315,60 @@ public final class TableConfigUtils {
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (!CollectionUtils.isEmpty(aggregationConfigs)) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          String aggregationFunction = 
aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in 
AggregationConfig " + aggregationConfig);
+          }
+
+          if (schema != null) {
+            FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+            Preconditions.checkState(fieldSpec != null, "The destination 
column '" + columnName
+                + "' of the aggregation function must be present in the 
schema");
+            Preconditions.checkState(fieldSpec.getFieldType() == 
FieldSpec.FieldType.METRIC,
+                "The destination column '" + columnName + "' of the 
aggregation function must be a metric column");
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config 
found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = 
RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' 
for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", 
aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", 
aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", 
aggregationConfig);
+
+          aggregationSourceColumns.add(argument.getIdentifier());
+        }
+        if (schema != null) {
+          Preconditions.checkState(new 
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
+              "all metric columns must be aggregated");
+        }
+      }
+
       // Transform configs
       List<TransformConfig> transformConfigs = 
ingestionConfig.getTransformConfigs();
       if (transformConfigs != null) {
@@ -313,8 +376,11 @@ public final class TableConfigUtils {
         for (TransformConfig transformConfig : transformConfigs) {
           String columnName = transformConfig.getColumnName();
           if (schema != null) {
-            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null,
-                "The destination column '" + columnName + "' of the transform 
function must be present in the schema");
+            Preconditions.checkState(
+                schema.getFieldSpecFor(columnName) != null || 
aggregationSourceColumns.contains(columnName),
+                "The destination column '" + columnName
+                    + "' of the transform function must be present in the 
schema or as a source column for "
+                    + "aggregations");
           }
           String transformFunction = transformConfig.getTransformFunction();
           if (columnName == null || transformFunction == null) {
@@ -363,6 +429,23 @@ public final class TableConfigUtils {
     }
   }
 
+  /**
+   * Currently only, ValueAggregators with fixed width types are allowed, so 
MIN, MAX, SUM, and COUNT. The reason
+   * is that only the {@link 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+   * supports random inserts and lookups. The
+   * {@link 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex
 only supports
+   * sequential inserts.
+   */
+  public static void validateIngestionAggregation(String name) {
+    for (AggregationFunctionType functionType : 
SUPPORTED_INGESTION_AGGREGATIONS) {
+      if (functionType.getName().equals(name)) {
+        return;
+      }
+    }
+    throw new IllegalStateException(
+        String.format("aggregation function %s must be one of %s", name, 
SUPPORTED_INGESTION_AGGREGATIONS));
+  }
+
   @VisibleForTesting
   static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
new file mode 100644
index 0000000000..5c5e2beeec
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MutableSegmentImplIngestionAggregationTest {
+  private static final String DIMENSION_1 = "dim1";
+  private static final String DIMENSION_2 = "dim2";
+
+  private static final String METRIC = "metric";
+  private static final String METRIC_2 = "metric_2";
+
+  private static final String TIME_COLUMN1 = "time1";
+  private static final String TIME_COLUMN2 = "time2";
+  private static final String KEY_SEPARATOR = "\t\t";
+  private static final int NUM_ROWS = 10001;
+
+  private static final Schema.SchemaBuilder getSchemaBuilder() {
+    return new Schema.SchemaBuilder().setSchemaName("testSchema")
+        .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN1, FieldSpec.DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS")
+        .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH", 
"1:HOURS");
+  }
+
+  private static final Set<String> VAR_LENGTH_SET = 
Collections.singleton(DIMENSION_2);
+  private static final Set<String> INVERTED_INDEX_SET =
+      new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1, 
TIME_COLUMN2));
+
+  private static final List<String> STRING_VALUES =
+      Collections.unmodifiableList(Arrays.asList("aa", "bbb", "cc", "ddd", 
"ee", "fff", "gg", "hhh", "ii", "jjj"));
+
+  @Test
+  public void testSameSrcDifferentAggregations()
+      throws Exception {
+    String m1 = "metric_MAX";
+    String m2 = "metric_MIN";
+
+    Schema schema =
+        getSchemaBuilder().addMetric(m2, 
FieldSpec.DataType.DOUBLE).addMetric(m1, FieldSpec.DataType.DOUBLE).build();
+    MutableSegmentImpl mutableSegmentImpl =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m2, m1)),
+            VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            Arrays.asList(new AggregationConfig(m1, "MAX(metric)"), new 
AggregationConfig(m2, "MIN(metric)")));
+
+    Map<String, Double> expectedMin = new HashMap<>();
+    Map<String, Double> expectedMax = new HashMap<>();
+    for (List<Metric> metrics : addRows(1, mutableSegmentImpl)) {
+      expectedMin.put(metrics.get(0).getKey(),
+          Math.min(expectedMin.getOrDefault(metrics.get(0).getKey(), 
Double.POSITIVE_INFINITY),
+              metrics.get(0).getValue()));
+      expectedMax.put(metrics.get(0).getKey(),
+          Math.max(expectedMax.getOrDefault(metrics.get(0).getKey(), 
Double.NEGATIVE_INFINITY),
+              metrics.get(0).getValue()));
+    }
+
+    GenericRow reuse = new GenericRow();
+    for (int docId = 0; docId < expectedMax.size(); docId++) {
+      GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
+      String key = buildKey(row);
+      Assert.assertEquals(row.getValue(m2), expectedMin.get(key), key);
+      Assert.assertEquals(row.getValue(m1), expectedMax.get(key), key);
+    }
+
+    mutableSegmentImpl.destroy();
+  }
+
+  @Test
+  public void testSameAggregationDifferentSrc()
+      throws Exception {
+    String m1 = "sum1";
+    String m2 = "sum2";
+
+    Schema schema =
+        getSchemaBuilder().addMetric(m1, FieldSpec.DataType.INT).addMetric(m2, 
FieldSpec.DataType.LONG).build();
+    MutableSegmentImpl mutableSegmentImpl =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m2, m1)),
+            VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            Arrays.asList(new AggregationConfig(m1, "SUM(metric)"), new 
AggregationConfig(m2, "SUM(metric_2)")));
+
+    Map<String, Integer> expectedSum1 = new HashMap<>();
+    Map<String, Long> expectedSum2 = new HashMap<>();
+    for (List<Metric> metrics : addRows(2, mutableSegmentImpl)) {
+      expectedSum1.put(metrics.get(0).getKey(),
+          expectedSum1.getOrDefault(metrics.get(0).getKey(), 0) + 
metrics.get(0).getValue());
+      expectedSum2.put(metrics.get(1).getKey(),
+          expectedSum2.getOrDefault(metrics.get(1).getKey(), 0L) + 
metrics.get(1).getValue().longValue());
+    }
+
+    GenericRow reuse = new GenericRow();
+    for (int docId = 0; docId < expectedSum1.size(); docId++) {
+      GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
+      String key = buildKey(row);
+      Assert.assertEquals(row.getValue(m1), expectedSum1.get(key), key);
+      Assert.assertEquals(row.getValue(m2), expectedSum2.get(key), key);
+    }
+
+    mutableSegmentImpl.destroy();
+  }
+
+  @Test
+  public void testCOUNT()
+      throws Exception {
+    String m1 = "count1";
+    String m2 = "count2";
+
+    Schema schema =
+        getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.LONG).addMetric(m2, FieldSpec.DataType.LONG).build();
+    MutableSegmentImpl mutableSegmentImpl =
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m1, m2)),
+            VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            Arrays.asList(new AggregationConfig(m1, "COUNT(metric)"), new 
AggregationConfig(m2, "COUNT(*)")));
+
+    Map<String, Long> expectedCount = new HashMap<>();
+    for (List<Metric> metrics : addRows(3, mutableSegmentImpl)) {
+      expectedCount.put(metrics.get(0).getKey(),
+          expectedCount.getOrDefault(metrics.get(0).getKey(), 0L) + 1L);
+    }
+
+    GenericRow reuse = new GenericRow();
+    for (int docId = 0; docId < expectedCount.size(); docId++) {
+      GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
+      String key = buildKey(row);
+      Assert.assertEquals(row.getValue(m1), expectedCount.get(key), key);
+      Assert.assertEquals(row.getValue(m2), expectedCount.get(key), key);
+    }
+
+    mutableSegmentImpl.destroy();
+  }
+
+  private String buildKey(GenericRow row) {
+    return row.getValue(DIMENSION_1) + KEY_SEPARATOR + 
row.getValue(DIMENSION_2) + KEY_SEPARATOR + row.getValue(
+        TIME_COLUMN1) + KEY_SEPARATOR + row.getValue(TIME_COLUMN2);
+  }
+
+  private GenericRow getRow(Random random) {
+    GenericRow row = new GenericRow();
+
+    row.putValue(DIMENSION_1, random.nextInt(10));
+    row.putValue(DIMENSION_2, 
STRING_VALUES.get(random.nextInt(STRING_VALUES.size())));
+    row.putValue(TIME_COLUMN1, random.nextInt(10));
+    row.putValue(TIME_COLUMN2, random.nextInt(5));
+
+    return row;
+  }
+
+  private class Metric {
+    private final String _key;
+    private final Integer _value;
+
+    Metric(String key, Integer value) {
+      _key = key;
+      _value = value;
+    }
+
+    public String getKey() {
+      return _key;
+    }
+
+    public Integer getValue() {
+      return _value;
+    }
+  }
+
+  private List<List<Metric>> addRows(long seed, MutableSegmentImpl 
mutableSegmentImpl)
+      throws Exception {
+    List<List<Metric>> metrics = new ArrayList<>();
+    Set<String> keys = new HashSet<>();
+
+
+    Random random = new Random(seed);
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = getRow(random);
+      // This needs to be relatively low since it will tend to overflow with 
the Int-to-Double conversion.
+      Integer metricValue = random.nextInt(10000);
+      Integer metric2Value = random.nextInt();
+      row.putValue(METRIC, metricValue);
+      row.putValue(METRIC_2, metric2Value);
+
+      mutableSegmentImpl.index(row, defaultMetadata);
+
+      String key = buildKey(row);
+      metrics.add(Arrays.asList(new Metric(key, metricValue), new Metric(key, 
metric2Value)));
+      keys.add(key);
+    }
+
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+    Assert.assertEquals(numDocsIndexed, keys.size());
+
+    // Assert that aggregation happened.
+    Assert.assertTrue(numDocsIndexed < NUM_ROWS);
+
+    return metrics;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index e715cdd22a..20d15334e4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -27,6 +28,7 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.Schema;
 
 import static org.mockito.Mockito.anyString;
@@ -40,7 +42,14 @@ public class MutableSegmentImplTestUtils {
 
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
-  private static final String STEAM_NAME = "testStream";
+  private static final String STREAM_NAME = "testStream";
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
+      List<AggregationConfig> preAggregationConfigs) {
+    return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
+        Collections.emptySet(), false, false, null, null, null, null, 
preAggregationConfigs);
+  }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics) {
@@ -61,20 +70,22 @@ public class MutableSegmentImplTestUtils {
       PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
         Collections.emptySet(), aggregateMetrics, nullHandlingEnabled, 
upsertConfig, timeColumnName,
-        partitionUpsertMetadataManager, null);
+        partitionUpsertMetadataManager, null, Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, Set<String> jsonIndexColumns,
       ServerMetrics serverMetrics) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
-        jsonIndexColumns, false, true, null, null, null, serverMetrics);
+        jsonIndexColumns, false, true, null, null, null, serverMetrics, 
Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, Set<String> jsonIndexColumns,
       boolean aggregateMetrics, boolean nullHandlingEnabled, UpsertConfig 
upsertConfig, String timeColumnName,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
ServerMetrics serverMetrics) {
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
ServerMetrics serverMetrics,
+      List<AggregationConfig> aggregationConfigs) {
+
     RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -85,14 +96,15 @@ public class MutableSegmentImplTestUtils {
         upsertConfig == null ? UpsertConfig.HashFunction.NONE : 
upsertConfig.getHashFunction();
     RealtimeSegmentConfig realtimeSegmentConfig =
         new 
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
-            
.setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
+            
.setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
             
.setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns).setJsonIndexColumns(jsonIndexColumns)
             
.setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
             .setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
             .setMemoryManager(new 
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
             
.setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction)
-            
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
+            .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
+            .setIngestionAggregationConfigs(aggregationConfigs).build();
     return new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 2f38f39a02..3415a46995 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -61,7 +61,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("map2_values", 
"Groovy({map2.sort()*.value}, map2)"));
     transformConfigs.add(new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
 
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
     DataTypeTransformer dataTypeTransformer = new 
DataTypeTransformer(pinotSchema);
@@ -150,7 +150,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+' 
'+lastName}, firstName, lastName)"));
     transformConfigs.add(new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
 
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
 
@@ -204,7 +204,7 @@ public class ExpressionTransformerTest {
     List<TransformConfig> transformConfigs = new ArrayList<>();
     transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName + 
' ' + lastName}, firstName, lastName)"));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
 
     GenericRow genericRow = new GenericRow();
@@ -220,7 +220,7 @@ public class ExpressionTransformerTest {
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "incoming"),
             new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"outgoing")).build();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, 
null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
     expressionTransformer = new ExpressionTransformer(tableConfig, 
pinotSchema);
 
     genericRow = new GenericRow();
@@ -244,7 +244,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("a", "plus(b, 10)"));
     transformConfigs.add(new TransformConfig("c", "plus(a, d)"));
     transformConfigs.add(new TransformConfig("f", "plus(e, 10)"));
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null);
+    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testDerivedFunctions")
         .setIngestionConfig(ingestionConfig).build();
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, schema);
@@ -272,7 +272,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("a", "plus(b,10)"));
     transformConfigs.add(new TransformConfig("a", "plus(c,10)"));
 
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null);
+    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testMultipleTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
@@ -295,7 +295,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("d", "plus(e,10)"));
     transformConfigs.add(new TransformConfig("c", "plus(d,e)"));
 
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null);
+    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testNonCyclicTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
@@ -323,7 +323,7 @@ public class ExpressionTransformerTest {
     transformConfigs.add(new TransformConfig("b", "plus(c,10)"));
     transformConfigs.add(new TransformConfig("c", "plus(a,10)"));
 
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null);
+    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testRecrusiveTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index f76551c0f4..f479fc351b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -92,7 +92,7 @@ public class RecordTransformerTest {
     // expression false, not filtered
     GenericRow genericRow = getRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt > 
123}, svInt)"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt > 
123}, svInt)"), null, null, null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -100,7 +100,7 @@ public class RecordTransformerTest {
     // expression true, filtered
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <= 
123}, svInt)"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <= 
123}, svInt)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -108,14 +108,14 @@ public class RecordTransformerTest {
     // value not found
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent 
== 123}, notPresent)"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent 
== 123}, notPresent)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // invalid function
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy(svInt == 
123)"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("Groovy(svInt == 
123)"), null, null, null));
     try {
       new FilterTransformer(tableConfig);
       Assert.fail("Should have failed constructing FilterTransformer");
@@ -126,7 +126,7 @@ public class RecordTransformerTest {
     // multi value column
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new 
FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null));
+        new IngestionConfig(null, null, new 
FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -187,7 +187,7 @@ public class RecordTransformerTest {
     GenericRow genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svInt = 123"), null, null));
+            new FilterConfig("svInt = 123"), null, null, null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -196,7 +196,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svDouble > 120"), null, null));
+            new FilterConfig("svDouble > 120"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -205,7 +205,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svDouble >= 123"), null, null));
+            new FilterConfig("svDouble >= 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -214,7 +214,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svDouble < 200"), null, null));
+            new FilterConfig("svDouble < 200"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -223,7 +223,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svDouble <= 123"), null, null));
+            new FilterConfig("svDouble <= 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -232,7 +232,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svLong != 125"), null, null));
+            new FilterConfig("svLong != 125"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -241,7 +241,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svLong = 123"), null, null));
+            new FilterConfig("svLong = 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -249,7 +249,7 @@ public class RecordTransformerTest {
     // expression true, filtered
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 
125)"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 
125)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -272,21 +272,23 @@ public class RecordTransformerTest {
     // expression true, filtered
     GenericRow genericRow = getNullColumnsRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("svNullString is 
null"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("svNullString is 
null"), null, null, null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
     genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("svInt is not null"), null, null));
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null, new FilterConfig("svInt is not null"), 
null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
     genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("mvLong is not null"), null, null));
+    tableConfig.setIngestionConfig(
+        new IngestionConfig(null, null, new FilterConfig("mvLong is not 
null"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -294,7 +296,7 @@ public class RecordTransformerTest {
     // expression true, filtered
     genericRow = getNullColumnsRecord();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("mvNullFloat is 
null"), null, null));
+        new IngestionConfig(null, null, new FilterConfig("mvNullFloat is 
null"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -308,7 +310,7 @@ public class RecordTransformerTest {
     GenericRow genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null));
+            new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null, 
null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -317,7 +319,7 @@ public class RecordTransformerTest {
     genericRow = getRecord();
     tableConfig.setIngestionConfig(
         new IngestionConfig(null, null,
-            new FilterConfig("svInt = 125 OR svLong <= 200"), null, null));
+            new FilterConfig("svInt = 125 OR svLong <= 200"), null, null, 
null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
index 0c5600c758..be0462dae3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
@@ -67,7 +67,8 @@ public class SegmentGenerationWithFilterRecordsTest {
   public void setup() {
     String filterFunction =
         "Groovy({((col2 < 1589007600000L) &&  (col3.max() < 4)) || col1 == 
\"B\"}, col1, col2, col3)";
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, new 
FilterConfig(filterFunction), null, null);
+    IngestionConfig ingestionConfig =
+        new IngestionConfig(null, null, new FilterConfig(filterFunction), 
null, null, null);
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
     _schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index c21bc47a90..8afb04880c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -591,7 +591,8 @@ public class SegmentPreProcessorTest {
       throws Exception {
     constructV1Segment();
     _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
-        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null));
+        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
+        null));
     
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
     checkUpdateDefaultColumns();
 
@@ -636,7 +637,8 @@ public class SegmentPreProcessorTest {
     assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
 
     _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
-        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null));
+        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
+        null));
     
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
     checkUpdateDefaultColumns();
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
index f9b7cf09da..d1c5e660ee 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -138,7 +139,7 @@ public class IngestionUtilsTest {
 
     // filter config
     IngestionConfig ingestionConfig =
-        new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100}, 
x)"), null, null);
+        new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100}, 
x)"), null, null, null);
     Set<String> fields = 
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
     Assert.assertEquals(fields.size(), 1);
     Assert.assertTrue(fields.containsAll(Sets.newHashSet("x")));
@@ -152,14 +153,14 @@ public class IngestionUtilsTest {
     schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", 
FieldSpec.DataType.STRING).build();
     List<TransformConfig> transformConfigs =
         Lists.newArrayList(new TransformConfig("d1", "Groovy({function}, 
argument1, argument2)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null);
+    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
     List<String> extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 3);
     Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", 
"argument2")));
 
     // groovy function, no arguments
     transformConfigs = Lists.newArrayList(new TransformConfig("d1", 
"Groovy({function})"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null);
+    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("d1"));
@@ -167,7 +168,7 @@ public class IngestionUtilsTest {
     // inbuilt functions
     schema = new 
Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch", 
FieldSpec.DataType.LONG).build();
     transformConfigs = Lists.newArrayList(new 
TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null);
+    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn", 
"hoursSinceEpoch")));
@@ -177,7 +178,7 @@ public class IngestionUtilsTest {
         new 
Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch", 
FieldSpec.DataType.LONG).build();
     transformConfigs =
         Lists.newArrayList(new TransformConfig("tenMinutesSinceEpoch", 
"toEpochMinutesBucket(timestampColumn, 10)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null);
+    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     
Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch",
 "timestampColumn")));
@@ -187,7 +188,7 @@ public class IngestionUtilsTest {
         .addDateTime("dateColumn", FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
     transformConfigs =
         Lists.newArrayList(new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null);
+    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn", 
"timestampColumn")));
@@ -200,7 +201,7 @@ public class IngestionUtilsTest {
     transformConfigs =
         Lists.newArrayList(new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
     ingestionConfig =
-        new IngestionConfig(null, null, new FilterConfig("Groovy({d1 == 
\"10\"}, d1)"), transformConfigs, null);
+        new IngestionConfig(null, null, new FilterConfig("Groovy({d1 == 
\"10\"}, d1)"), transformConfigs, null, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 6);
     Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", 
"dateColumn", "xy", "timestampColumn")));
@@ -220,11 +221,37 @@ public class IngestionUtilsTest {
     ComplexTypeConfig complexTypeConfigs = new 
ComplexTypeConfig(fieldsToUnnest, ".",
             ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, 
prefixesToRename);
     FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)");
-    ingestionConfig = new IngestionConfig(null, null, filterConfig, 
transformConfigs, complexTypeConfigs);
+    ingestionConfig = new IngestionConfig(null, null, filterConfig, 
transformConfigs, complexTypeConfigs, null);
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 8);
     List<String> expectedColumns =
             Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", 
"timestampColumn", "before", "after");
     Assert.assertTrue(extract.containsAll(expectedColumns));
   }
+
+  @Test
+  public void testExtractFieldsAggregationConfig() {
+    Schema schema = new Schema();
+
+    List<AggregationConfig> aggregationConfigs = Arrays.asList(new 
AggregationConfig("d1", "SUM(s1)"));
+    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
null, null, aggregationConfigs);
+
+    Set<String> fields = 
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
+    Assert.assertEquals(fields.size(), 1);
+    Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MIN(s1)"));
+    ingestionConfig = new IngestionConfig(null, null, null, null, null, 
aggregationConfigs);
+
+    fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema);
+    Assert.assertEquals(fields.size(), 1);
+    Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MAX(s1)"));
+    ingestionConfig = new IngestionConfig(null, null, null, null, null, 
aggregationConfigs);
+
+    fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema);
+    Assert.assertEquals(fields.size(), 1);
+    Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b1910b540d..69ad516bfb 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -229,28 +230,30 @@ public class TableConfigUtilsTest {
 
     // null filter config, transform config
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, 
null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // null filter function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig(null), null, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig(null), null, null, null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("startsWith(columnX, 
\"myPrefix\")"), null, null)).build();
+            new IngestionConfig(null, null, new 
FilterConfig("startsWith(columnX, \"myPrefix\")"), null, null, null))
+        .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy({x == 10}, x)"), null, null))
+        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy({x == 10}, x)"), null, null, null))
         .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid filter function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy(badExpr)"), null, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy(badExpr)"), null, null, null))
+        .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail on invalid filter function string");
@@ -259,7 +262,8 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("fakeFunction(xx)"), null, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("fakeFunction(xx)"), null, null, null))
+        .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid filter function");
@@ -269,13 +273,13 @@ public class TableConfigUtilsTest {
 
     // empty transform configs
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
Collections.emptyList(), null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, 
Collections.emptyList(), null, null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // transformed column not in schema
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
-            null)).build();
+            null, null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for transformedColumn not present in schema");
@@ -283,13 +287,21 @@ public class TableConfigUtilsTest {
       // expected
     }
 
+    // using a transformation column in an aggregation
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("twice", "col * 2")),
+            null, Lists.newArrayList((new AggregationConfig("twiceSum", 
"SUM(twice)"))))).build();
+    TableConfigUtils.validate(tableConfig, schema);
+
     schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .build();
     // valid transform configs
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
-            null)).build();
+            null, null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     schema =
@@ -298,7 +310,7 @@ public class TableConfigUtilsTest {
     // valid transform configs
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)"),
-            new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")), 
null)).build();
+            new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")), 
null, null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid transform config since Groovy is disabled
@@ -311,7 +323,7 @@ public class TableConfigUtilsTest {
 
     // invalid filter config since Groovy is disabled
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, new 
FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null))
+            new IngestionConfig(null, null, new 
FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null, null))
         .build();
     try {
       TableConfigUtils.validate(tableConfig, schema, null, true);
@@ -323,7 +335,7 @@ public class TableConfigUtilsTest {
     // null transform column name
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig(null, "reverse(anotherCol)")),
-            null)).build();
+            null, null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null column name in transform config");
@@ -333,7 +345,8 @@ public class TableConfigUtilsTest {
 
     // null transform function string
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", null)), null)).build();
+            new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", null)), null, null))
+        .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null transform function in transform 
config");
@@ -344,7 +357,7 @@ public class TableConfigUtilsTest {
     // invalid function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "fakeFunction(col)")),
-            null)).build();
+            null, null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
@@ -354,7 +367,7 @@ public class TableConfigUtilsTest {
 
     // invalid function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")),
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")), null,
             null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -365,8 +378,8 @@ public class TableConfigUtilsTest {
 
     // input field name used as destination field
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(myCol)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(myCol)")), null,
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
@@ -376,8 +389,9 @@ public class TableConfigUtilsTest {
 
     // input field name used as destination field
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null,
-            Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y + 
myCol}, x, myCol, y)")), null)).build();
+            new IngestionConfig(null, null, null,
+                Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y 
+ myCol}, x, myCol, y)")), null, null))
+        .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
@@ -389,7 +403,7 @@ public class TableConfigUtilsTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null,
             Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)")),
-            null)).build();
+            null, null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to duplicate transform config");
@@ -400,16 +414,15 @@ public class TableConfigUtilsTest {
     // derived columns - should pass
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("transformedCol", "reverse(x)"),
-            new TransformConfig("myCol", "lower(transformedCol)")), 
null)).build();
+            new TransformConfig("myCol", "lower(transformedCol)")), null, 
null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid field name in schema with matching prefix from 
complexConfigType's prefixesToRename
     HashMap<String, String> prefixesToRename = new HashMap<>();
     prefixesToRename.put("after.", "");
     ComplexTypeConfig complexConfig = new ComplexTypeConfig(null, ".", null, 
prefixesToRename);
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, null,
-                    null, complexConfig)).build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setIngestionConfig(new IngestionConfig(null, null, null, null, 
complexConfig, null)).build();
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
             .addMultiValueDimension("after.test", 
FieldSpec.DataType.STRING).build();
     try {
@@ -420,12 +433,181 @@ public class TableConfigUtilsTest {
     }
   }
 
+  @Test
+  public void ingestionAggregationConfigsTest() {
+    List<AggregationConfig> aggregationConfigs = Arrays.asList(new 
AggregationConfig("d1", "SUM(s1)"));
+    IngestionConfig ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            
.setIngestionConfig(ingestionConfig).setAggregateMetrics(true).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to aggregateMetrics being set");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to destination column not being in schema");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("d1", 
FieldSpec.DataType.DOUBLE)
+            .build();
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to aggregation column being a dimension");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
+    aggregationConfigs = Arrays.asList(new AggregationConfig(null, null));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to null columnName/aggregationFunction");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"), 
new AggregationConfig("d1", "SUM(s2)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to duplicate destination column");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM s1"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to invalid aggregation function");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", 
"DISTINCTCOUNTHLL(s1)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to not supported aggregation function");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "s1 + s2"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to multiple arguments");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1 - 
s2)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to inner value not being a column");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(d1)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE)
+        .addMetric("d2", FieldSpec.DataType.DOUBLE).build();
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+      Assert.fail("Should fail due to one metric column not being aggregated");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
+    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
+    ingestionConfig =
+        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+  }
+
   @Test
   public void ingestionStreamConfigsTest() {
     Map<String, String> streamConfigs = getStreamConfigs();
     IngestionConfig ingestionConfig =
         new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null,
-            null, null);
+            null, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
             .setIngestionConfig(ingestionConfig).build();
@@ -440,7 +622,7 @@ public class TableConfigUtilsTest {
 
     // stream config should be valid
     ingestionConfig =
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null);
+        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null, 
null);
     tableConfig.setIngestionConfig(ingestionConfig);
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
@@ -465,7 +647,7 @@ public class TableConfigUtilsTest {
 
     IngestionConfig ingestionConfig =
         new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, 
null),
-            null, null, null, null);
+            null, null, null, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
             .build();
@@ -486,12 +668,12 @@ public class TableConfigUtilsTest {
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
         .setIngestionConfig(new IngestionConfig(
             new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, 
batchConfigMap), "REFRESH", null), null, null,
-            null, null)).build();
+            null, null, null)).build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
     // dimension tables should have batch ingestion config
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, 
null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for Dimension table without batch ingestion 
config");
@@ -503,7 +685,7 @@ public class TableConfigUtilsTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
         .setIngestionConfig(new IngestionConfig(
             new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, 
batchConfigMap), "APPEND", null), null, null,
-            null, null)).build();
+            null, null, null)).build();
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for Dimension table with ingestion type APPEND 
(should be REFRESH)");
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java
new file mode 100644
index 0000000000..811af8bbbd
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/AggregationConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class AggregationConfig extends BaseJsonConfig {
+
+  @JsonPropertyDescription("Aggregated column name")
+  private final String _columnName;
+
+  @JsonPropertyDescription("Aggregation function")
+  private final String _aggregationFunction;
+
+  @JsonCreator
+  public AggregationConfig(@JsonProperty("columnName") String columnName,
+      @JsonProperty("aggregationFunction") String aggregationFunction) {
+    _columnName = columnName;
+    _aggregationFunction = aggregationFunction;
+  }
+
+  public String getColumnName() {
+    return _columnName;
+  }
+
+  public String getAggregationFunction() {
+    return _aggregationFunction;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 8a1ea11b67..6b8329410a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -47,17 +47,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Config related to handling complex type")
   private ComplexTypeConfig _complexTypeConfig;
 
+  @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
+  private List<AggregationConfig> _aggregationConfigs;
+
+
   @JsonCreator
   public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable 
BatchIngestionConfig batchIngestionConfig,
       @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig 
streamIngestionConfig,
       @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
       @JsonProperty("transformConfigs") @Nullable List<TransformConfig> 
transformConfigs,
-      @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig 
complexTypeConfig) {
+      @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig 
complexTypeConfig,
+      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> 
aggregationConfigs) {
     _batchIngestionConfig = batchIngestionConfig;
     _streamIngestionConfig = streamIngestionConfig;
     _filterConfig = filterConfig;
     _transformConfigs = transformConfigs;
     _complexTypeConfig = complexTypeConfig;
+    _aggregationConfigs = aggregationConfigs;
   }
 
   public IngestionConfig() {
@@ -88,6 +94,11 @@ public class IngestionConfig extends BaseJsonConfig {
     return _complexTypeConfig;
   }
 
+  @Nullable
+  public List<AggregationConfig> getAggregationConfigs() {
+    return _aggregationConfigs;
+  }
+
   public void setBatchIngestionConfig(BatchIngestionConfig 
batchIngestionConfig) {
     _batchIngestionConfig = batchIngestionConfig;
   }
@@ -107,4 +118,8 @@ public class IngestionConfig extends BaseJsonConfig {
   public void setComplexTypeConfig(ComplexTypeConfig complexTypeConfig) {
     _complexTypeConfig = complexTypeConfig;
   }
+
+  public void setAggregationConfigs(List<AggregationConfig> 
aggregationConfigs) {
+    _aggregationConfigs = aggregationConfigs;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index e89b28e831..ba8aefb1ee 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
@@ -73,6 +74,17 @@ public final class IngestionConfigUtils {
     return streamConfigMap;
   }
 
+  public static List<AggregationConfig> getAggregationConfigs(TableConfig 
tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        "aggregationConfigs are only supported in REALTIME tables. Found a 
OFFLINE table: %s", tableNameWithType);
+
+    if (tableConfig.getIngestionConfig() != null) {
+      return tableConfig.getIngestionConfig().getAggregationConfigs();
+    }
+    return null;
+  }
+
   /**
    * Fetches the configured segmentIngestionType (APPEND/REFRESH) from the 
table config
    * First checks in the ingestionConfig. If not found, checks in the 
segmentsConfig (has been deprecated from here
@@ -178,8 +190,8 @@ public final class IngestionConfigUtils {
    * Extracts the segment name generator type from the batchConfigMap, or 
returns default value if not found
    */
   public static String getSegmentNameGeneratorType(Map<String, String> 
batchConfigMap) {
-    return batchConfigMap
-        .getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, 
DEFAULT_SEGMENT_NAME_GENERATOR_TYPE);
+    return 
batchConfigMap.getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+        DEFAULT_SEGMENT_NAME_GENERATOR_TYPE);
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index cb0dc7ba9e..159b3c0d6b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -99,6 +99,7 @@ public class TableConfigBuilder {
   private List<String> _varLengthDictionaryColumns;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private List<String> _jsonIndexColumns;
+  private boolean _aggregateMetrics;
 
   private TableCustomConfig _customConfig;
   private QuotaConfig _quotaConfig;
@@ -284,6 +285,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setAggregateMetrics(boolean aggregateMetrics) {
+    _aggregateMetrics = aggregateMetrics;
+    return this;
+  }
+
   public TableConfigBuilder setStreamConfigs(Map<String, String> 
streamConfigs) {
     Preconditions.checkState(_tableType == TableType.REALTIME);
     _streamConfigs = streamConfigs;
@@ -404,6 +410,7 @@ public class TableConfigBuilder {
     indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
     indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
     indexingConfig.setJsonIndexColumns(_jsonIndexColumns);
+    indexingConfig.setAggregateMetrics(_aggregateMetrics);
 
     if (_customConfig == null) {
       _customConfig = new TableCustomConfig(null);
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index 1121f87f90..e9831ea622 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -56,7 +56,8 @@ public class IngestionConfigUtilsTest {
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put("streamType", "kafka");
     tableConfig.setIngestionConfig(
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null));
+        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null,
+            null));
     Map<String, String> actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
     Assert.assertEquals(actualStreamConfigsMap.size(), 1);
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
@@ -74,7 +75,8 @@ public class IngestionConfigUtilsTest {
 
     // fail if multiple found
     tableConfig.setIngestionConfig(new IngestionConfig(null,
-        new StreamIngestionConfig(Lists.newArrayList(streamConfigMap, 
deprecatedStreamConfigMap)), null, null, null));
+        new StreamIngestionConfig(Lists.newArrayList(streamConfigMap, 
deprecatedStreamConfigMap)), null, null, null,
+        null));
     try {
       IngestionConfigUtils.getStreamConfigMap(tableConfig);
       Assert.fail("Should fail for multiple stream configs");
@@ -103,7 +105,7 @@ public class IngestionConfigUtilsTest {
     // get from ingestion config, when not present in segmentsConfig
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null));
+        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
     
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig),
 "HOURLY");
 
     // get from ingestion config, even if present in segmentsConfig
@@ -128,7 +130,7 @@ public class IngestionConfigUtilsTest {
     // get from ingestion config, when not present in segmentsConfig
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
     tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null));
+        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
     
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
 "APPEND");
 
     // get from ingestion config, even if present in segmentsConfig


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to